Support variance and standard deviation (#2525)

* Support variance and standard deviation

* addressed comments
This commit is contained in:
Navis Ryu 2016-08-05 09:32:58 +09:00 committed by Charles Allen
parent 33dbe0800c
commit 5b3f0ccb1f
27 changed files with 2424 additions and 14 deletions

View File

@ -93,6 +93,8 @@
<argument>-c</argument>
<argument>io.druid.extensions:druid-s3-extensions</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-stats</argument>
<argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>

View File

@ -0,0 +1,152 @@
---
layout: doc_page
---
# Stats aggregator
Includes stat-related aggregators, including variance and standard deviations, etc. Make sure to [include](../../operations/including-extensions.html) `druid-stats` as an extension.
## Variance aggregator
Algorithm of the aggregator is the same with that of apache hive. This is the description in GenericUDAFVariance in hive.
Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
"Algorithms for computing the sample variance: analysis and recommendations"
The American Statistician, 37 (1983) pp. 242--247.
variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
where: - variance is sum[x-avg^2] (this is actually n times the variance)
and is updated at every step. - n is the count of elements in chunk1 - m is
the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
sum of elements in chunk2.
This algorithm was proven to be numerically stable by J.L. Barlow in
"Error analysis of a pairwise summation algorithm to compute sample variance"
Numer. Math, 58 (1991) pp. 583--590
### Pre-aggregating variance at ingestion time
To use this feature, an "variance" aggregator must be included at indexing time.
The ingestion aggregator can only apply to numeric values. If you use "variance"
then any input rows missing the value will be considered to have a value of 0.
User can specify expected input type as one of "float", "long", "variance" for ingestion, which is by default "float".
```json
{
"type" : "variance",
"name" : <output_name>,
"fieldName" : <metric_name>,
"inputType" : <input_type>,
"estimator" : <string>
}
```
To query for results, "variance" aggregator with "variance" input type or simply a "varianceFold" aggregator must be included in the query.
```json
{
"type" : "varianceFold",
"name" : <output_name>,
"fieldName" : <metric_name>,
"estimator" : <string>
}
```
|Property |Description |Default |
|-------------------------|------------------------------|----------------------------------|
|`estimator`|Set "population" to get variance_pop rather than variance_sample, which is default.|null|
### Standard Deviation post-aggregator
To acquire standard deviation from variance, user can use "stddev" post aggregator.
```json
{
"type": "stddev",
"name": "<output_name>",
"fieldName": "<aggregator_name>",
"estimator": <string>
}
```
## Query Examples:
### Timeseries Query
```json
{
"queryType": "timeseries",
"dataSource": "testing",
"granularity": "day",
"aggregations": [
{
"type": "variance",
"name": "index_var",
"fieldName": "index_var"
}
],
"intervals": [
"2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
]
}
```
### TopN Query
```json
{
"queryType": "topN",
"dataSource": "testing",
"dimensions": ["alias"],
"threshold": 5,
"granularity": "all",
"aggregations": [
{
"type": "variance",
"name": "index_var",
"fieldName": "index"
}
],
"postAggregations": [
{
"type": "stddev",
"name": "index_stddev",
"fieldName": "index_var"
}
],
"intervals": [
"2016-03-06T00:00:00/2016-03-06T23:59:59"
]
}
```
### GroupBy Query
```json
{
"queryType": "groupBy",
"dataSource": "testing",
"dimensions": ["alias"],
"granularity": "all",
"aggregations": [
{
"type": "variance",
"name": "index_var",
"fieldName": "index"
}
],
"postAggregations": [
{
"type": "stddev",
"name": "index_stddev",
"fieldName": "index_var"
}
],
"intervals": [
"2016-03-06T00:00:00/2016-03-06T23:59:59"
]
}
```

View File

@ -30,6 +30,7 @@ Core extensions are maintained by Druid committers.
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
|postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|

View File

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-stats</artifactId>
<name>druid-stats</name>
<description>druid-stats</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.stats;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import io.druid.query.aggregation.variance.StandardDeviationPostAggregator;
import io.druid.query.aggregation.variance.VarianceAggregatorFactory;
import io.druid.query.aggregation.variance.VarianceFoldingAggregatorFactory;
import io.druid.query.aggregation.variance.VarianceSerde;
import io.druid.segment.serde.ComplexMetrics;
import java.util.List;
/**
*/
public class DruidStatsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule().registerSubtypes(
VarianceAggregatorFactory.class,
VarianceFoldingAggregatorFactory.class,
StandardDeviationPostAggregator.class
)
);
}
@Override
public void configure(Binder binder)
{
if (ComplexMetrics.getSerdeForType("variance") == null) {
ComplexMetrics.registerSerde("variance", new VarianceSerde());
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
/**
*/
@JsonTypeName("stddev")
public class StandardDeviationPostAggregator implements PostAggregator
{
protected final String name;
protected final String fieldName;
protected final String estimator;
protected final boolean isVariancePop;
@JsonCreator
public StandardDeviationPostAggregator(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("estimator") String estimator
)
{
this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null");
this.name = Preconditions.checkNotNull(name, "name is null");
this.estimator = estimator;
this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
}
@Override
public Set<String> getDependentFields()
{
return Sets.newHashSet(fieldName);
}
@Override
public Comparator<Double> getComparator()
{
return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return Math.sqrt(((VarianceAggregatorCollector) combinedAggregators.get(fieldName)).getVariance(isVariancePop));
}
@Override
@JsonProperty("name")
public String getName()
{
return name;
}
@JsonProperty("fieldName")
public String getFieldName()
{
return fieldName;
}
@JsonProperty("estimator")
public String getEstimator()
{
return estimator;
}
@Override
public String toString()
{
return "StandardDeviationPostAggregator{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
", isVariancePop='" + isVariancePop + '\'' +
'}';
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
/**
*/
public abstract class VarianceAggregator implements Aggregator
{
protected final String name;
protected final VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
public VarianceAggregator(String name)
{
this.name = name;
}
@Override
public void reset()
{
holder.reset();
}
@Override
public Object get()
{
return holder;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("VarianceAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("VarianceAggregator does not support getLong()");
}
public static final class FloatVarianceAggregator extends VarianceAggregator
{
private final FloatColumnSelector selector;
public FloatVarianceAggregator(String name, FloatColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate()
{
holder.add(selector.get());
}
}
public static final class LongVarianceAggregator extends VarianceAggregator
{
private final LongColumnSelector selector;
public LongVarianceAggregator(String name, LongColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate()
{
holder.add(selector.get());
}
}
public static final class ObjectVarianceAggregator extends VarianceAggregator
{
private final ObjectColumnSelector selector;
public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate()
{
VarianceAggregatorCollector.combineValues(holder, selector.get());
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import java.nio.ByteBuffer;
import java.util.Comparator;
/**
*
* Algorithm used here is copied from apache hive. This is description in GenericUDAFVariance
*
* Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in
* "Algorithms for computing the sample variance: analysis and recommendations"
* The American Statistician, 37 (1983) pp. 242--247.
*
* variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
*
* where: - variance is sum[x-avg^2] (this is actually n times the variance)
* and is updated at every step. - n is the count of elements in chunk1 - m is
* the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
* sum of elements in chunk2.
*
* This algorithm was proven to be numerically stable by J.L. Barlow in
* "Error analysis of a pairwise summation algorithm to compute sample variance"
* Numer. Math, 58 (1991) pp. 583--590
*/
public class VarianceAggregatorCollector
{
public static boolean isVariancePop(String estimator) {
return estimator != null && estimator.equalsIgnoreCase("population");
}
public static VarianceAggregatorCollector from(ByteBuffer buffer)
{
return new VarianceAggregatorCollector(buffer.getLong(), buffer.getDouble(), buffer.getDouble());
}
public static final Comparator<VarianceAggregatorCollector> COMPARATOR = new Comparator<VarianceAggregatorCollector>()
{
@Override
public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
{
int compare = Longs.compare(o1.count, o2.count);
if (compare == 0) {
compare = Doubles.compare(o1.sum, o2.sum);
if (compare == 0) {
compare = Doubles.compare(o1.nvariance, o2.nvariance);
}
}
return compare;
}
};
static Object combineValues(Object lhs, Object rhs)
{
final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs;
final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs;
if (holder2.count == 0) {
return holder1;
}
if (holder1.count == 0) {
holder1.nvariance = holder2.nvariance;
holder1.count = holder2.count;
holder1.sum = holder2.sum;
return holder1;
}
final double ratio = holder1.count / (double) holder2.count;
final double t = holder1.sum / ratio - holder2.sum;
holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t);
holder1.count += holder2.count;
holder1.sum += holder2.sum;
return holder1;
}
static int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES + Doubles.BYTES;
}
long count; // number of elements
double sum; // sum of elements
double nvariance; // sum[x-avg^2] (this is actually n times of the variance)
public VarianceAggregatorCollector()
{
this(0, 0, 0);
}
public void reset()
{
count = 0;
sum = 0;
nvariance = 0;
}
public VarianceAggregatorCollector(long count, double sum, double nvariance)
{
this.count = count;
this.sum = sum;
this.nvariance = nvariance;
}
public VarianceAggregatorCollector add(float v)
{
count++;
sum += v;
if (count > 1) {
double t = count * v - sum;
nvariance += (t * t) / ((double) count * (count - 1));
}
return this;
}
public VarianceAggregatorCollector add(long v)
{
count++;
sum += v;
if (count > 1) {
double t = count * v - sum;
nvariance += (t * t) / ((double) count * (count - 1));
}
return this;
}
public double getVariance(boolean variancePop)
{
if (count == 0) {
// in SQL standard, we should return null for zero elements. But druid there should not be such a case
throw new IllegalStateException("should not be empty holder");
} else if (count == 1) {
return 0d;
} else {
return variancePop ? nvariance / count : nvariance / (count - 1);
}
}
@JsonValue
public byte[] toByteArray()
{
final ByteBuffer buffer = toByteBuffer();
buffer.flip();
byte[] theBytes = new byte[buffer.remaining()];
buffer.get(theBytes);
return theBytes;
}
public ByteBuffer toByteBuffer()
{
return ByteBuffer.allocate(Longs.BYTES + Doubles.BYTES + Doubles.BYTES)
.putLong(count)
.putDouble(sum)
.putDouble(nvariance);
}
@VisibleForTesting
boolean equalsWithEpsilon(VarianceAggregatorCollector o, double epsilon)
{
if (this == o) {
return true;
}
if (count != o.count) {
return false;
}
if (Math.abs(sum - o.sum) > epsilon) {
return false;
}
if (Math.abs(nvariance - o.nvariance) > epsilon) {
return false;
}
return true;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VarianceAggregatorCollector that = (VarianceAggregatorCollector) o;
if (count != that.count) {
return false;
}
if (Double.compare(that.sum, sum) != 0) {
return false;
}
if (Double.compare(that.nvariance, nvariance) != 0) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result;
long temp;
result = (int) (count ^ (count >>> 32));
temp = Double.doubleToLongBits(sum);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(nvariance);
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public String toString()
{
return "VarianceHolder{" +
"count=" + count +
", sum=" + sum +
", nvariance=" + nvariance +
'}';
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.metamx.common.IAE;
import com.metamx.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
*/
@JsonTypeName("variance")
public class VarianceAggregatorFactory extends AggregatorFactory
{
protected static final byte CACHE_TYPE_ID = 16;
protected final String fieldName;
protected final String name;
protected final String estimator;
private final String inputType;
protected final boolean isVariancePop;
@JsonCreator
public VarianceAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("estimator") String estimator,
@JsonProperty("inputType") String inputType
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
this.estimator = estimator;
this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator);
this.inputType = inputType == null ? "float" : inputType;
}
public VarianceAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, null);
}
@Override
public String getTypeName()
{
return "variance";
}
@Override
public int getMaxIntermediateSize()
{
return VarianceAggregatorCollector.getMaxIntermediateSize();
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopAggregator();
}
if ("float".equalsIgnoreCase(inputType)) {
return new VarianceAggregator.FloatVarianceAggregator(
name,
metricFactory.makeFloatColumnSelector(fieldName)
);
} else if ("long".equalsIgnoreCase(inputType)) {
return new VarianceAggregator.LongVarianceAggregator(
name,
metricFactory.makeLongColumnSelector(fieldName)
);
} else if ("variance".equalsIgnoreCase(inputType)) {
return new VarianceAggregator.ObjectVarianceAggregator(name, selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopBufferAggregator();
}
if ("float".equalsIgnoreCase(inputType)) {
return new VarianceBufferAggregator.FloatVarianceAggregator(
name,
metricFactory.makeFloatColumnSelector(fieldName)
);
} else if ("long".equalsIgnoreCase(inputType)) {
return new VarianceBufferAggregator.LongVarianceAggregator(
name,
metricFactory.makeLongColumnSelector(fieldName)
);
} else if ("variance".equalsIgnoreCase(inputType)) {
return new VarianceBufferAggregator.ObjectVarianceAggregator(name, selector);
}
throw new IAE(
"Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new VarianceFoldingAggregatorFactory(name, name, estimator);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new VarianceAggregatorFactory(fieldName, fieldName, estimator, inputType));
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public Comparator getComparator()
{
return VarianceAggregatorCollector.COMPARATOR;
}
@Override
public Object getAggregatorStartValue()
{
return new VarianceAggregatorCollector();
}
@Override
public Object combine(Object lhs, Object rhs)
{
return VarianceAggregatorCollector.combineValues(lhs, rhs);
}
@Override
public Object finalizeComputation(Object object)
{
return ((VarianceAggregatorCollector) object).getVariance(isVariancePop);
}
@Override
public Object deserialize(Object object)
{
if (object instanceof byte[]) {
return VarianceAggregatorCollector.from(ByteBuffer.wrap((byte[]) object));
} else if (object instanceof ByteBuffer) {
return VarianceAggregatorCollector.from((ByteBuffer) object);
} else if (object instanceof String) {
return VarianceAggregatorCollector.from(
ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)))
);
}
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getEstimator()
{
return estimator;
}
@JsonProperty
public String getInputType()
{
return inputType;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
byte[] inputTypeBytes = StringUtils.toUtf8(inputType);
return ByteBuffer.allocate(2 + fieldNameBytes.length + 1 + inputTypeBytes.length)
.put(CACHE_TYPE_ID)
.put(isVariancePop ? (byte) 1 : 0)
.put(fieldNameBytes)
.put((byte) 0xFF)
.put(inputTypeBytes)
.array();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
", isVariancePop='" + isVariancePop + '\'' +
", inputType='" + inputType + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VarianceAggregatorFactory that = (VarianceAggregatorFactory) o;
if (!Objects.equals(name, that.name)) {
return false;
}
if (!Objects.equals(isVariancePop, that.isVariancePop)) {
return false;
}
if (!Objects.equals(inputType, that.inputType)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = fieldName.hashCode();
result = 31 * result + Objects.hashCode(name);
result = 31 * result + Objects.hashCode(isVariancePop);
result = 31 * result + Objects.hashCode(inputType);
return result;
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public abstract class VarianceBufferAggregator implements BufferAggregator
{
private static final int COUNT_OFFSET = 0;
private static final int SUM_OFFSET = Longs.BYTES;
private static final int NVARIANCE_OFFSET = SUM_OFFSET + Doubles.BYTES;
protected final String name;
public VarianceBufferAggregator(String name)
{
this.name = name;
}
@Override
public void init(final ByteBuffer buf, final int position)
{
buf.putLong(position + COUNT_OFFSET, 0)
.putDouble(position + SUM_OFFSET, 0)
.putDouble(position + NVARIANCE_OFFSET, 0);
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
holder.count = buf.getLong(position);
holder.sum = buf.getDouble(position + SUM_OFFSET);
holder.nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
return holder;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
}
@Override
public void close()
{
}
public static final class FloatVarianceAggregator extends VarianceBufferAggregator
{
private final FloatColumnSelector selector;
public FloatVarianceAggregator(String name, FloatColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
float v = selector.get();
long count = buf.getLong(position + COUNT_OFFSET) + 1;
double sum = buf.getDouble(position + SUM_OFFSET) + v;
buf.putLong(position, count);
buf.putDouble(position + SUM_OFFSET, sum);
if (count > 1) {
double t = count * v - sum;
double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}
}
public static final class LongVarianceAggregator extends VarianceBufferAggregator
{
private final LongColumnSelector selector;
public LongVarianceAggregator(String name, LongColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
long v = selector.get();
long count = buf.getLong(position + COUNT_OFFSET) + 1;
double sum = buf.getDouble(position + SUM_OFFSET) + v;
buf.putLong(position, count);
buf.putDouble(position + SUM_OFFSET, sum);
if (count > 1) {
double t = count * v - sum;
double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1));
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}
}
public static final class ObjectVarianceAggregator extends VarianceBufferAggregator
{
private final ObjectColumnSelector selector;
public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
{
super(name);
this.selector = selector;
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) selector.get();
long count = buf.getLong(position + COUNT_OFFSET);
if (count == 0) {
buf.putLong(position, holder2.count);
buf.putDouble(position + SUM_OFFSET, holder2.sum);
buf.putDouble(position + NVARIANCE_OFFSET, holder2.nvariance);
return;
}
double sum = buf.getDouble(position + SUM_OFFSET);
double nvariance = buf.getDouble(position + NVARIANCE_OFFSET);
final double ratio = count / (double) holder2.count;
final double t = sum / ratio - holder2.sum;
nvariance += holder2.nvariance + (ratio / (count + holder2.count) * t * t);
count += holder2.count;
sum += holder2.sum;
buf.putLong(position, count);
buf.putDouble(position + SUM_OFFSET, sum);
buf.putDouble(position + NVARIANCE_OFFSET, nvariance);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
*/
@JsonTypeName("varianceFold")
public class VarianceFoldingAggregatorFactory extends VarianceAggregatorFactory
{
public VarianceFoldingAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("estimator") String estimator
)
{
super(name, fieldName, estimator, "variance");
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.collect.Ordering;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class VarianceSerde extends ComplexMetricSerde
{
private static final Ordering<VarianceAggregatorCollector> comparator =
Ordering.from(VarianceAggregatorCollector.COMPARATOR).nullsFirst();
@Override
public String getTypeName()
{
return "variance";
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<VarianceAggregatorCollector> extractedClass()
{
return VarianceAggregatorCollector.class;
}
@Override
public VarianceAggregatorCollector extractValue(InputRow inputRow, String metricName)
{
Object rawValue = inputRow.getRaw(metricName);
if (rawValue instanceof VarianceAggregatorCollector) {
return (VarianceAggregatorCollector) rawValue;
}
VarianceAggregatorCollector collector = new VarianceAggregatorCollector();
List<String> dimValues = inputRow.getDimension(metricName);
if (dimValues != null && dimValues.size() > 0) {
for (String dimValue : dimValues) {
float value = Float.parseFloat(dimValue);
collector.add(value);
}
}
return collector;
}
};
}
@Override
public void deserializeColumn(
ByteBuffer byteBuffer, ColumnBuilder columnBuilder
)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy getObjectStrategy()
{
return new ObjectStrategy<VarianceAggregatorCollector>()
{
@Override
public Class<? extends VarianceAggregatorCollector> getClazz()
{
return VarianceAggregatorCollector.class;
}
@Override
public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return VarianceAggregatorCollector.from(readOnlyBuffer);
}
@Override
public byte[] toBytes(VarianceAggregatorCollector collector)
{
return collector == null ? new byte[]{} : collector.toByteArray();
}
@Override
public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2)
{
return comparator.compare(o1, o2);
}
};
}
}

View File

@ -0,0 +1 @@
io.druid.query.aggregation.stats.DruidStatsModule

View File

@ -0,0 +1,171 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class VarianceAggregatorCollectorTest
{
private static final float[] market_upfront = new float[]{
800.0f, 800.0f, 826.0602f, 1564.6177f, 1006.4021f, 869.64374f, 809.04175f, 1458.4027f, 852.4375f, 879.9881f,
950.1468f, 712.7746f, 846.2675f, 682.8855f, 1109.875f, 594.3817f, 870.1159f, 677.511f, 1410.2781f, 1219.4321f,
979.306f, 1224.5016f, 1215.5898f, 716.6092f, 1301.0233f, 786.3633f, 989.9315f, 1609.0967f, 1023.2952f, 1367.6381f,
1627.598f, 810.8894f, 1685.5001f, 545.9906f, 1870.061f, 555.476f, 1643.3408f, 943.4972f, 1667.4978f, 913.5611f,
1218.5619f, 1273.7074f, 888.70526f, 1113.1141f, 864.5689f, 1308.582f, 785.07886f, 1363.6149f, 787.1253f,
826.0392f, 1107.2438f, 872.6257f, 1188.3693f, 911.9568f, 794.0988f, 1299.0933f, 1212.9283f, 901.3273f, 723.5143f,
1061.9734f, 602.97955f, 879.4061f, 724.2625f, 862.93134f, 1133.1351f, 948.65796f, 807.6017f, 914.525f, 1553.3485f,
1208.4567f, 679.6193f, 645.1777f, 1120.0887f, 1649.5333f, 1433.3988f, 1598.1793f, 1192.5631f, 1022.85455f,
1228.5024f, 1298.4158f, 1345.9644f, 1291.898f, 1306.4957f, 1287.7667f, 1631.5844f, 578.79596f, 1017.5732f,
782.0135f, 829.91626f, 1862.7379f, 873.3065f, 1427.0167f, 1430.2573f, 1101.9182f, 1166.1411f, 1004.94086f,
740.1837f, 865.7779f, 901.30756f, 691.9589f, 1674.3317f, 975.57794f, 1360.6948f, 755.89935f, 771.34845f,
869.30835f, 1095.6376f, 906.3738f, 988.8938f, 835.76263f, 776.70294f, 875.6834f, 1070.8363f, 835.46124f,
715.5161f, 755.64655f, 771.1005f, 764.50806f, 736.40924f, 884.8373f, 918.72284f, 893.98505f, 832.8749f,
850.995f, 767.9733f, 848.3399f, 878.6838f, 906.1019f, 1403.8302f, 936.4296f, 846.2884f, 856.4901f, 1032.2576f,
954.7542f, 1031.99f, 907.02155f, 1110.789f, 843.95215f, 1362.6506f, 884.8015f, 1684.2688f, 873.65204f, 855.7177f,
996.56415f, 1061.6786f, 962.2358f, 1019.8985f, 1056.4193f, 1198.7231f, 1108.1361f, 1289.0095f,
1069.4318f, 1001.13403f, 1030.4995f, 1734.2749f, 1063.2012f, 1447.3412f, 1234.2476f, 1144.3424f, 1049.7385f,
811.9913f, 768.4231f, 1151.0692f, 877.0794f, 1146.4231f, 902.6157f, 1355.8434f, 897.39343f, 1260.1431f, 762.8625f,
935.168f, 782.10785f, 996.2054f, 767.69214f, 1031.7415f, 775.9656f, 1374.9684f, 853.163f, 1456.6118f, 811.92523f,
989.0328f, 744.7446f, 1166.4012f, 753.105f, 962.7312f, 780.272f
};
private static final float[] market_total_market = new float[]{
1000.0f, 1000.0f, 1040.9456f, 1689.0128f, 1049.142f, 1073.4766f, 1007.36554f, 1545.7089f, 1016.9652f, 1077.6127f,
1075.0896f, 953.9954f, 1022.7833f, 937.06195f, 1156.7448f, 849.8775f, 1066.208f, 904.34064f, 1240.5255f,
1343.2325f, 1088.9431f, 1349.2544f, 1102.8667f, 939.2441f, 1109.8754f, 997.99457f, 1037.4495f, 1686.4197f,
1074.007f, 1486.2013f, 1300.3022f, 1021.3345f, 1314.6195f, 792.32605f, 1233.4489f, 805.9301f, 1184.9207f,
1127.231f, 1203.4656f, 1100.9048f, 1097.2112f, 1410.793f, 1033.4012f, 1283.166f, 1025.6333f, 1331.861f,
1039.5005f, 1332.4684f, 1011.20544f, 1029.9952f, 1047.2129f, 1057.08f, 1064.9727f, 1082.7277f, 971.0508f,
1320.6383f, 1070.1655f, 1089.6478f, 980.3866f, 1179.6959f, 959.2362f, 1092.417f, 987.0674f, 1103.4583f,
1091.2231f, 1199.6074f, 1044.3843f, 1183.2408f, 1289.0973f, 1360.0325f, 993.59125f, 1021.07117f, 1105.3834f,
1601.8295f, 1200.5272f, 1600.7233f, 1317.4584f, 1304.3262f, 1544.1082f, 1488.7378f, 1224.8271f, 1421.6487f,
1251.9062f, 1414.619f, 1350.1754f, 970.7283f, 1057.4272f, 1073.9673f, 996.4337f, 1743.9218f, 1044.5629f,
1474.5911f, 1159.2788f, 1292.5428f, 1124.2014f, 1243.354f, 1051.809f, 1143.0784f, 1097.4907f, 1010.3703f,
1326.8291f, 1179.8038f, 1281.6012f, 994.73126f, 1081.6504f, 1103.2397f, 1177.8584f, 1152.5477f, 1117.954f,
1084.3325f, 1029.8025f, 1121.3854f, 1244.85f, 1077.2794f, 1098.5432f, 998.65076f, 1088.8076f, 1008.74554f,
998.75397f, 1129.7233f, 1075.243f, 1141.5884f, 1037.3811f, 1099.1973f, 981.5773f, 1092.942f, 1072.2394f,
1154.4156f, 1311.1786f, 1176.6052f, 1107.2202f, 1102.699f, 1285.0901f, 1217.5475f, 1283.957f, 1178.8302f,
1301.7781f, 1119.2472f, 1403.3389f, 1156.6019f, 1429.5802f, 1137.8423f, 1124.9352f, 1256.4998f, 1217.8774f,
1247.8909f, 1185.71f, 1345.7817f, 1250.1667f, 1390.754f, 1224.1162f, 1361.0802f, 1190.9337f, 1310.7971f,
1466.2094f, 1366.4476f, 1314.8397f, 1522.0437f, 1193.5563f, 1321.375f, 1055.7837f, 1021.6387f, 1197.0084f,
1131.532f, 1192.1443f, 1154.2896f, 1272.6771f, 1141.5146f, 1190.8961f, 1009.36316f, 1006.9138f, 1032.5999f,
1137.3857f, 1030.0756f, 1005.25305f, 1030.0947f, 1112.7948f, 1113.3575f, 1153.9747f, 1069.6409f, 1016.13745f,
994.9023f, 1032.1543f, 999.5864f, 994.75275f, 1029.057f
};
@Test
public void testVariance()
{
Random random = new Random();
for (float[] values : Arrays.asList(market_upfront, market_total_market)) {
double sum = 0;
for (float f : values) {
sum += f;
}
final double mean = sum / values.length;
double temp = 0;
for (float f : values) {
temp += Math.pow(f - mean, 2);
}
final double variance_pop = temp / values.length;
final double variance_sample = temp / (values.length - 1);
VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
for (float f : values) {
holder.add(f);
}
Assert.assertEquals(holder.getVariance(true), variance_pop, 0.001);
Assert.assertEquals(holder.getVariance(false), variance_sample, 0.001);
for (int mergeOn : new int[] {2, 3, 5, 9}) {
List<VarianceAggregatorCollector> holders1 = Lists.newArrayListWithCapacity(mergeOn);
List<Pair<VarianceBufferAggregator, ByteBuffer>> holders2 = Lists.newArrayListWithCapacity(mergeOn);
FloatHandOver valueHandOver = new FloatHandOver();
for (int i = 0; i < mergeOn; i++) {
holders1.add(new VarianceAggregatorCollector());
holders2.add(Pair.<VarianceBufferAggregator, ByteBuffer>of(
new VarianceBufferAggregator.FloatVarianceAggregator("XX", valueHandOver),
ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize())
));
}
for (float f : values) {
valueHandOver.v = f;
int index = random.nextInt(mergeOn);
holders1.get(index).add(f);
holders2.get(index).lhs.aggregate(holders2.get(index).rhs, 0);
}
VarianceAggregatorCollector holder1 = holders1.get(0);
for (int i = 1; i < mergeOn; i++) {
holder1 = (VarianceAggregatorCollector) VarianceAggregatorCollector.combineValues(holder1, holders1.get(i));
}
ObjectHandOver collectHandOver = new ObjectHandOver();
ByteBuffer buffer = ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize());
VarianceBufferAggregator.ObjectVarianceAggregator merger = new VarianceBufferAggregator.ObjectVarianceAggregator("xxx", collectHandOver);
for (int i = 0; i < mergeOn; i++) {
collectHandOver.v = holders2.get(i).lhs.get(holders2.get(i).rhs, 0);
merger.aggregate(buffer, 0);
}
VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) merger.get(buffer, 0);
Assert.assertEquals(holder2.getVariance(true), variance_pop, 0.01);
Assert.assertEquals(holder2.getVariance(false), variance_sample, 0.01);
}
}
}
private static class FloatHandOver implements FloatColumnSelector
{
float v;
@Override
public float get()
{
return v;
}
}
private static class ObjectHandOver implements ObjectColumnSelector
{
Object v;
@Override
public Class classOfObject()
{
return v == null ? Object.class : v.getClass();
}
@Override
public Object get()
{
return v;
}
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.TestFloatColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
/**
*/
public class VarianceAggregatorTest
{
private VarianceAggregatorFactory aggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestFloatColumnSelector selector;
private final float[] values = {1.1f, 2.7f, 3.5f, 1.3f};
private final double[] variances_pop = new double[values.length]; // calculated
private final double[] variances_samp = new double[values.length]; // calculated
public VarianceAggregatorTest() throws Exception
{
String aggSpecJson = "{\"type\": \"variance\", \"name\": \"billy\", \"fieldName\": \"nilly\"}";
aggFactory = new DefaultObjectMapper().readValue(aggSpecJson, VarianceAggregatorFactory.class);
double sum = 0;
for (int i = 0; i < values.length; i++) {
sum += values[i];
if (i > 0) {
double mean = sum / (i + 1);
double temp = 0;
for (int j = 0; j <= i; j++) {
temp += Math.pow(values[j] - mean, 2);
}
variances_pop[i] = temp / (i + 1);
variances_samp[i] = temp / i;
}
}
}
@Before
public void setup()
{
selector = new TestFloatColumnSelector(values);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("nilly")).andReturn(new TestObjectColumnSelector(0.0f));
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(selector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testDoubleVarianceAggregator()
{
VarianceAggregator agg = (VarianceAggregator) aggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
aggregate(selector, agg);
assertValues((VarianceAggregatorCollector) agg.get(), 1, 1.1d, 0d);
aggregate(selector, agg);
assertValues((VarianceAggregatorCollector) agg.get(), 2, 3.8d, 1.28d);
aggregate(selector, agg);
assertValues((VarianceAggregatorCollector) agg.get(), 3, 7.3d, 2.9866d);
aggregate(selector, agg);
assertValues((VarianceAggregatorCollector) agg.get(), 4, 8.6d, 3.95d);
agg.reset();
assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
}
private void assertValues(VarianceAggregatorCollector holder, long count, double sum, double nvariance)
{
Assert.assertEquals(count, holder.count);
Assert.assertEquals(sum, holder.sum, 0.0001);
Assert.assertEquals(nvariance, holder.nvariance, 0.0001);
if (count == 0) {
try {
holder.getVariance(false);
Assert.fail("Should throw ISE");
}
catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("should not be empty holder"));
}
} else {
Assert.assertEquals(holder.getVariance(true), variances_pop[(int) count - 1], 0.0001);
Assert.assertEquals(holder.getVariance(false), variances_samp[(int) count - 1], 0.0001);
}
}
@Test
public void testDoubleVarianceBufferAggregator()
{
VarianceBufferAggregator agg = (VarianceBufferAggregator) aggFactory.factorizeBuffered(
colSelectorFactory
);
ByteBuffer buffer = ByteBuffer.wrap(new byte[aggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 0, 0d, 0d);
aggregate(selector, agg, buffer, 0);
assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 1, 1.1d, 0d);
aggregate(selector, agg, buffer, 0);
assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 2, 3.8d, 1.28d);
aggregate(selector, agg, buffer, 0);
assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 3, 7.3d, 2.9866d);
aggregate(selector, agg, buffer, 0);
assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 4, 8.6d, 3.95d);
}
@Test
public void testCombine()
{
VarianceAggregatorCollector holder1 = new VarianceAggregatorCollector().add(1.1f).add(2.7f);
VarianceAggregatorCollector holder2 = new VarianceAggregatorCollector().add(3.5f).add(1.3f);
VarianceAggregatorCollector expected = new VarianceAggregatorCollector(4, 8.6d, 3.95d);
Assert.assertTrue(expected.equalsWithEpsilon((VarianceAggregatorCollector) aggFactory.combine(holder1, holder2), 0.00001));
}
@Test
public void testEqualsAndHashCode() throws Exception
{
VarianceAggregatorFactory one = new VarianceAggregatorFactory("name1", "fieldName1");
VarianceAggregatorFactory oneMore = new VarianceAggregatorFactory("name1", "fieldName1");
VarianceAggregatorFactory two = new VarianceAggregatorFactory("name2", "fieldName2");
Assert.assertEquals(one.hashCode(), oneMore.hashCode());
Assert.assertTrue(one.equals(oneMore));
Assert.assertFalse(one.equals(two));
}
private void aggregate(TestFloatColumnSelector selector, VarianceAggregator agg)
{
agg.aggregate();
selector.increment();
}
private void aggregate(
TestFloatColumnSelector selector,
VarianceBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
selector.increment();
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import io.druid.query.groupby.having.GreaterThanHavingSpec;
import io.druid.query.groupby.having.HavingSpec;
import io.druid.query.groupby.having.OrHavingSpec;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.segment.TestHelper;
import org.joda.time.Period;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
*/
@RunWith(Parameterized.class)
public class VarianceGroupByQueryTest
{
private final GroupByQueryConfig config;
private final QueryRunner<Row> runner;
private final GroupByQueryRunnerFactory factory;
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return GroupByQueryRunnerTest.constructorFeeder();
}
public VarianceGroupByQueryTest(GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
{
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
}
@Test
public void testGroupByVarianceOnly()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Arrays.<AggregatorFactory>asList(VarianceTestHelper.indexVarianceAggr))
.setPostAggregatorSpecs(Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr))
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
VarianceTestHelper.RowBuilder builder =
new VarianceTestHelper.RowBuilder(new String[]{"alias", "index_stddev", "index_var"});
List<Row> expectedResults = builder
.add("2011-04-01", "automotive", 0d, 0d)
.add("2011-04-01", "business", 0d, 0d)
.add("2011-04-01", "entertainment", 0d, 0d)
.add("2011-04-01", "health", 0d, 0d)
.add("2011-04-01", "mezzanine", 737.0179286322613d, 543195.4271253889d)
.add("2011-04-01", "news", 0d, 0d)
.add("2011-04-01", "premium", 726.6322593583996d, 527994.4403402924d)
.add("2011-04-01", "technology", 0d, 0d)
.add("2011-04-01", "travel", 0d, 0d)
.add("2011-04-02", "automotive", 0d, 0d)
.add("2011-04-02", "business", 0d, 0d)
.add("2011-04-02", "entertainment", 0d, 0d)
.add("2011-04-02", "health", 0d, 0d)
.add("2011-04-02", "mezzanine", 611.3420766546617d, 373739.13468843425d)
.add("2011-04-02", "news", 0d, 0d)
.add("2011-04-02", "premium", 621.3898134843073d, 386125.30030206224d)
.add("2011-04-02", "technology", 0d, 0d)
.add("2011-04-02", "travel", 0d, 0d)
.build();
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupBy()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
VarianceTestHelper.rowsCount,
VarianceTestHelper.indexVarianceAggr,
new LongSumAggregatorFactory("idx", "index")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(VarianceTestHelper.stddevOfIndexPostAggr)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
VarianceTestHelper.RowBuilder builder =
new VarianceTestHelper.RowBuilder(new String[]{"alias", "rows", "idx", "index_stddev", "index_var"});
List<Row> expectedResults = builder
.add("2011-04-01", "automotive", 1L, 135L, 0d, 0d)
.add("2011-04-01", "business", 1L, 118L, 0d, 0d)
.add("2011-04-01", "entertainment", 1L, 158L, 0d, 0d)
.add("2011-04-01", "health", 1L, 120L, 0d, 0d)
.add("2011-04-01", "mezzanine", 3L, 2870L, 737.0179286322613d, 543195.4271253889d)
.add("2011-04-01", "news", 1L, 121L, 0d, 0d)
.add("2011-04-01", "premium", 3L, 2900L, 726.6322593583996d, 527994.4403402924d)
.add("2011-04-01", "technology", 1L, 78L, 0d, 0d)
.add("2011-04-01", "travel", 1L, 119L, 0d, 0d)
.add("2011-04-02", "automotive", 1L, 147L, 0d, 0d)
.add("2011-04-02", "business", 1L, 112L, 0d, 0d)
.add("2011-04-02", "entertainment", 1L, 166L, 0d, 0d)
.add("2011-04-02", "health", 1L, 113L, 0d, 0d)
.add("2011-04-02", "mezzanine", 3L, 2447L, 611.3420766546617d, 373739.13468843425d)
.add("2011-04-02", "news", 1L, 114L, 0d, 0d)
.add("2011-04-02", "premium", 3L, 2505L, 621.3898134843073d, 386125.30030206224d)
.add("2011-04-02", "technology", 1L, 97L, 0d, 0d)
.add("2011-04-02", "travel", 1L, 126L, 0d, 0d)
.build();
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testPostAggHavingSpec()
{
VarianceTestHelper.RowBuilder expect = new VarianceTestHelper.RowBuilder(
new String[]{"alias", "rows", "index", "index_var", "index_stddev"}
);
List<Row> expectedResults = expect
.add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
.add("2011-04-01", "mezzanine", 6L, 4420L, 254083.76447001836, 504.06722217380724)
.add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(VarianceTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
VarianceTestHelper.rowsCount,
VarianceTestHelper.indexLongSum,
VarianceTestHelper.indexVarianceAggr
)
)
.setPostAggregatorSpecs(ImmutableList.<PostAggregator>of(VarianceTestHelper.stddevOfIndexPostAggr))
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setHavingSpec(
new OrHavingSpec(
ImmutableList.<HavingSpec>of(
new GreaterThanHavingSpec(VarianceTestHelper.stddevOfIndexMetric, 15L) // 3 rows
)
)
)
.build();
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
query = query.withLimitSpec(
new DefaultLimitSpec(
Arrays.<OrderByColumnSpec>asList(
OrderByColumnSpec.asc(
VarianceTestHelper.stddevOfIndexMetric
)
), 2
)
);
expectedResults = expect
.add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847)
.add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106)
.build();
results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import io.druid.segment.data.ObjectStrategy;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Random;
public class VarianceSerdeTest
{
@Test
public void testSerde()
{
Random r = new Random();
VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
ObjectStrategy strategy = new VarianceSerde().getObjectStrategy();
Assert.assertEquals(VarianceAggregatorCollector.class, strategy.getClazz());
for (int i = 0; i < 100; i++) {
byte[] array = strategy.toBytes(holder);
Assert.assertArrayEquals(array, holder.toByteArray());
Assert.assertEquals(holder, strategy.fromByteBuffer(ByteBuffer.wrap(array), array.length));
holder.add(r.nextFloat());
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.stats.DruidStatsModule;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
public class VarianceTestHelper extends QueryRunnerTestHelper
{
static {
DruidStatsModule module = new DruidStatsModule();
module.configure(null);
}
public static final String indexVarianceMetric = "index_var";
public static final VarianceAggregatorFactory indexVarianceAggr = new VarianceAggregatorFactory(
indexVarianceMetric,
indexMetric
);
public static final String stddevOfIndexMetric = "index_stddev";
public static final PostAggregator stddevOfIndexPostAggr = new StandardDeviationPostAggregator(
stddevOfIndexMetric,
indexVarianceMetric,
null
);
public static final List<AggregatorFactory> commonPlusVarAggregators = Arrays.asList(
rowsCount,
indexDoubleSum,
qualityUniques,
indexVarianceAggr
);
public static class RowBuilder
{
private final String[] names;
private final List<Row> rows = Lists.newArrayList();
public RowBuilder(String[] names)
{
this.names = names;
}
public RowBuilder add(final String timestamp, Object... values)
{
rows.add(build(timestamp, values));
return this;
}
public List<Row> build()
{
try {
return Lists.newArrayList(rows);
}
finally {
rows.clear();
}
}
public Row build(final String timestamp, Object... values)
{
Preconditions.checkArgument(names.length == values.length);
Map<String, Object> theVals = Maps.newHashMap();
for (int i = 0; i < values.length; i++) {
theVals.put(names[i], values[i]);
}
DateTime ts = new DateTime(timestamp);
return new MapBasedRow(ts, theVals);
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.TestHelper;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@RunWith(Parameterized.class)
public class VarianceTimeseriesQueryTest
{
@Parameterized.Parameters(name="{0}:descending={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return TimeseriesQueryRunnerTest.constructorFeeder();
}
private final QueryRunner runner;
private final boolean descending;
public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending)
{
this.runner = runner;
this.descending = descending;
}
@Test
public void testTimeseriesWithNullFilterOnNonExistentDimension()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(VarianceTestHelper.dataSource)
.granularity(VarianceTestHelper.dayGran)
.filters("bobby", null)
.intervals(VarianceTestHelper.firstToThird)
.aggregators(VarianceTestHelper.commonPlusVarAggregators)
.postAggregators(
Arrays.<PostAggregator>asList(
VarianceTestHelper.addRowsIndexConstant,
VarianceTestHelper.stddevOfIndexPostAggr
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
VarianceTestHelper.of(
"rows", 13L,
"index", 6626.151596069336,
"addRowsIndexConstant", 6640.151596069336,
"uniques", VarianceTestHelper.UNIQUES_9,
"index_var", descending ? 368885.6897238851 : 368885.689155086,
"index_stddev", descending ? 607.3596049490657 : 607.35960448081
)
)
),
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
VarianceTestHelper.of(
"rows", 13L,
"index", 5833.2095947265625,
"addRowsIndexConstant", 5847.2095947265625,
"uniques", VarianceTestHelper.UNIQUES_9,
"index_var", descending ? 259061.6037088883 : 259061.60216419376,
"index_stddev", descending ? 508.9809463122252 : 508.98094479478675
)
)
)
);
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, new HashMap<String, Object>()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
assertExpectedResults(expectedResults, results);
}
private <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> results)
{
if (descending) {
expectedResults = TestHelper.revert(expectedResults);
}
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.variance;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerTest;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class VarianceTopNQueryTest
{
@Parameterized.Parameters
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return TopNQueryRunnerTest.constructorFeeder();
}
private final QueryRunner runner;
public VarianceTopNQueryTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Test
public void testFullOnTopNOverUniques()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.uniqueMetric)
.threshold(3)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
VarianceTestHelper.commonPlusVarAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.put("index_var", 439.3851694586573D)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.put("index_var", 27679.900887366413D)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.put("index_var", 79699.9780741607D)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
private Sequence<Result<TopNResultValue>> assertExpectedResults(
Iterable<Result<TopNResultValue>> expectedResults,
TopNQuery query
)
{
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(query, ImmutableMap.<String, Object>of());
TestHelper.assertExpectedResults(expectedResults, retval);
return retval;
}
}

View File

@ -89,6 +89,7 @@
<module>extensions-core/datasketches</module>
<module>extensions-core/hdfs-storage</module>
<module>extensions-core/histogram</module>
<module>extensions-core/stats</module>
<module>extensions-core/kafka-eight</module>
<module>extensions-core/kafka-extraction-namespace</module>
<module>extensions-core/kafka-indexing-service</module>

View File

@ -37,7 +37,7 @@ import java.util.Set;
*/
public class ArithmeticPostAggregator implements PostAggregator
{
private static final Comparator DEFAULT_COMPARATOR = new Comparator()
public static final Comparator DEFAULT_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o, Object o1)

View File

@ -386,6 +386,22 @@ public class GroupByQuery extends BaseQuery<Row>
);
}
public GroupByQuery withLimitSpec(final LimitSpec limitSpec)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
getDimFilter(),
getGranularity(),
getDimensions(),
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
limitSpec,
getContext()
);
}
public static class Builder
{
private DataSource dataSource;

View File

@ -778,6 +778,22 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return metricDesc != null ? metricDesc.getType() : null;
}
public Class getMetricClass(String metric)
{
MetricDesc metricDesc = metricDescs.get(metric);
switch (metricDesc.getCapabilities().getType()) {
case COMPLEX:
return ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
case FLOAT:
return Float.TYPE;
case LONG:
return Long.TYPE;
case STRING:
return String.class;
}
return null;
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));

View File

@ -21,13 +21,11 @@ package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.metamx.common.UOE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
@ -57,8 +55,6 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.filter.Filters;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -74,7 +70,6 @@ import java.util.Map;
*/
public class IncrementalIndexStorageAdapter implements StorageAdapter
{
private static final Splitter SPLITTER = Splitter.on(",");
private static final NullDimensionSelector NULL_DIMENSION_SELECTOR = new NullDimensionSelector();
private final IncrementalIndex index;
@ -304,7 +299,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
if (Thread.interrupted()) {
throw new QueryInterruptedException( new InterruptedException());
throw new QueryInterruptedException(new InterruptedException());
}
boolean foundMatched = false;
@ -529,14 +524,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final Integer metricIndexInt = index.getMetricIndex(column);
if (metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column));
final Class classOfObject = index.getMetricClass(column);
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return serde.getObjectStrategy().getClazz();
return classOfObject;
}
@Override

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
@ -100,6 +101,7 @@ public class QueryRunnerTestHelper
public static final QueryGranularity dayGran = QueryGranularities.DAY;
public static final QueryGranularity allGran = QueryGranularities.ALL;
public static final String timeDimension = "__time";
public static final String marketDimension = "market";
public static final String qualityDimension = "quality";
public static final String placementDimension = "placement";
@ -119,9 +121,9 @@ public class QueryRunnerTestHelper
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", "__time");
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", indexMetric);
public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", timeDimension);
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", indexMetric);
public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
public static final String JS_RESET_0 = "function reset() { return 0; }";
public static final JavaScriptAggregatorFactory jsIndexSumIfPlacementishA = new JavaScriptAggregatorFactory(
@ -519,4 +521,13 @@ public class QueryRunnerTestHelper
}
};
}
public static Map<String, Object> of(Object... keyvalues)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < keyvalues.length; i += 2) {
builder.put(String.valueOf(keyvalues[i]), keyvalues[i + 1]);
}
return builder.build();
}
}

View File

@ -695,7 +695,7 @@ public class SelectQueryRunnerTest
if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
Assert.assertEquals(ex.getValue(), actVal);
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}
}