Fixed buckets histogram aggregator (#6638)

* Fixed buckets histogram aggregator

* PR comments

* More PR comments

* Checkstyle

* TeamCity

* More TeamCity

* PR comment

* PR comment

* Fix doc formatting
This commit is contained in:
Jonathan Wei 2019-01-17 14:51:16 -08:00 committed by GitHub
parent 161dac1d23
commit 68f744ec0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 4942 additions and 26 deletions

6
NOTICE
View File

@ -92,3 +92,9 @@ This product contains modified versions of the Dockerfile and related configurat
* https://github.com/sequenceiq/hadoop-docker/
* COMMIT TAG:
* update this when this patch is committed
This product contains fixed bins histogram percentile computation code adapted from Netflix Spectator:
* LICENSE:
* https://github.com/Netflix/spectator/blob/master/LICENSE (Apache License, Version 2.0)
* HOMEPAGE:
* https://github.com/Netflix/spectator

View File

@ -61,6 +61,28 @@
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.druid.query.aggregation.histogram.FixedBucketsHistogram;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class FixedHistogramAddBenchmark
{
private static final int LOWER_LIMIT = 0;
private static final int UPPER_LIMIT = 100000;
// Number of samples
@Param({"100000", "1000000"})
int numEvents;
// Number of buckets
@Param({"10", "100", "1000", "10000", "100000"})
int numBuckets;
private FixedBucketsHistogram fixedHistogramForAdds;
private int[] randomValues;
private float[] normalDistributionValues;
@Setup
public void setup()
{
randomValues = new int[numEvents];
Random r = ThreadLocalRandom.current();
for (int i = 0; i < numEvents; i++) {
randomValues[i] = r.nextInt(UPPER_LIMIT);
}
fixedHistogramForAdds = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
NormalDistribution normalDistribution = new NormalDistribution(50000, 10000);
normalDistributionValues = new float[numEvents];
for (int i = 0; i < numEvents; i++) {
normalDistributionValues[i] = (float) normalDistribution.sample();
}
}
@Benchmark
public void addFixedHisto(Blackhole bh)
{
fixedHistogramForAdds = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
for (int i = 0; i < numEvents; i++) {
fixedHistogramForAdds.add(randomValues[i]);
}
bh.consume(fixedHistogramForAdds);
}
@Benchmark
public void addFixedHistoNormal(Blackhole bh)
{
fixedHistogramForAdds = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
for (int i = 0; i < numEvents; i++) {
fixedHistogramForAdds.add(normalDistributionValues[i]);
}
bh.consume(fixedHistogramForAdds);
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.histogram.FixedBucketsHistogram;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class FixedHistogramBenchmark
{
private static final Logger log = new Logger(FixedHistogramBenchmark.class);
private static final int LOWER_LIMIT = 0;
private static final int UPPER_LIMIT = 100000;
// Number of samples
@Param({"1000000"})
int numEvents;
// Number of buckets
@Param({"10", "100", "1000", "10000", "100000"})
int numBuckets;
private FixedBucketsHistogram fixedHistogram;
private FixedBucketsHistogram fixedHistogram2;
private FixedBucketsHistogram fixedHistogram3;
private FixedBucketsHistogram fixedHistogramForSparseLower;
private FixedBucketsHistogram fixedHistogramForSparseUpper;
private int[] randomValues;
private byte[] fixedFullSerializedAlready;
private byte[] fixedSparseLowerSerialized;
private byte[] fixedSparseUpperSerialized;
private double[] percentilesForFixed = new double[]{12.5, 25, 50, 75, 98};
@Setup
public void setup()
{
fixedHistogram = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
fixedHistogram2 = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
(int) Math.round(numBuckets * 1.5),
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
fixedHistogram3 = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
fixedHistogramForSparseLower = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
fixedHistogramForSparseUpper = new FixedBucketsHistogram(
LOWER_LIMIT,
UPPER_LIMIT,
numBuckets,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
randomValues = new int[numEvents];
Random r = ThreadLocalRandom.current();
for (int i = 0; i < numEvents; i++) {
randomValues[i] = r.nextInt(UPPER_LIMIT);
fixedHistogram.add(randomValues[i]);
fixedHistogram2.add(randomValues[i]);
fixedHistogram3.add(randomValues[i]);
if (randomValues[i] < UPPER_LIMIT * 0.4) {
fixedHistogramForSparseLower.add(randomValues[i]);
}
if (randomValues[i] > UPPER_LIMIT * 0.6) {
fixedHistogramForSparseUpper.add(randomValues[i]);
}
}
fixedFullSerializedAlready = fixedHistogram.toBytesFull(true);
fixedSparseLowerSerialized = fixedHistogramForSparseLower.toBytesSparse(fixedHistogram.getNonEmptyBucketCount());
fixedSparseUpperSerialized = fixedHistogramForSparseUpper.toBytesSparse(fixedHistogram.getNonEmptyBucketCount());
}
@Benchmark
public void mergeFixedDifferentBuckets(Blackhole bh)
{
FixedBucketsHistogram copy = fixedHistogram.getCopy();
copy.combineHistogram(fixedHistogram2);
bh.consume(copy);
}
@Benchmark
public void mergeFixedSameBuckets(Blackhole bh)
{
FixedBucketsHistogram copy = fixedHistogram.getCopy();
copy.combineHistogram(fixedHistogram3);
bh.consume(copy);
}
@Benchmark
public void getPercentilesFixed(Blackhole bh)
{
float[] percentiles = fixedHistogram.percentilesFloat(percentilesForFixed);
bh.consume(percentiles);
}
@Benchmark
public void serializeFixedSparseLower(Blackhole bh)
{
byte[] sparseSerialized = fixedHistogramForSparseLower.toBytesSparse(fixedHistogramForSparseUpper.getNonEmptyBucketCount());
bh.consume(sparseSerialized);
}
@Benchmark
public void deserializeFixedSparseLower(Blackhole bh)
{
FixedBucketsHistogram fixedBucketsHistogram = FixedBucketsHistogram.fromBytes(fixedSparseLowerSerialized);
bh.consume(fixedBucketsHistogram);
}
@Benchmark
public void serializeFixedSparseUpper(Blackhole bh)
{
byte[] sparseSerialized = fixedHistogramForSparseUpper.toBytesSparse(fixedHistogramForSparseUpper.getNonEmptyBucketCount());
bh.consume(sparseSerialized);
}
@Benchmark
public void deserializeFixedSparseUpper(Blackhole bh)
{
FixedBucketsHistogram fixedBucketsHistogram = FixedBucketsHistogram.fromBytes(fixedSparseUpperSerialized);
bh.consume(fixedBucketsHistogram);
}
@Benchmark
public void serializeFixedFull(Blackhole bh)
{
byte[] fullSerialized = fixedHistogram.toBytesFull(true);
bh.consume(fullSerialized);
}
@Benchmark
public void deserializeFixedFull(Blackhole bh)
{
FixedBucketsHistogram fixedBucketsHistogram = FixedBucketsHistogram.fromBytes(fixedFullSerializedAlready);
bh.consume(fixedBucketsHistogram);
}
}

View File

@ -1,6 +1,6 @@
---
layout: doc_page
title: "Approximate Histogram aggregator"
title: "Approximate Histogram aggregators"
---
<!--
@ -22,10 +22,14 @@ title: "Approximate Histogram aggregator"
~ under the License.
-->
# Approximate Histogram aggregator
# Approximate Histogram aggregators
Make sure to [include](../../operations/including-extensions.html) `druid-histogram` as an extension.
The `druid-histogram` extension provides an approximate histogram aggregator and a fixed buckets histogram aggregator.
## Approximate Histogram aggregator
This aggregator is based on
[http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf](http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf)
to compute approximate histograms, with the following modifications:
@ -92,17 +96,148 @@ query.
|`numBuckets` |Number of output buckets for the resulting histogram. Bucket intervals are dynamic, based on the range of the underlying data. Use a post-aggregator to have finer control over the bucketing scheme|7|
|`lowerLimit`/`upperLimit`|Restrict the approximation to the given range. The values outside this range will be aggregated into two centroids. Counts of values outside this range are still maintained. |-INF/+INF|
## Fixed Buckets Histogram
### Approximate Histogram post-aggregators
The fixed buckets histogram aggregator builds a histogram on a numeric column, with evenly-sized buckets across a specified value range. Values outside of the range are handled based on a user-specified outlier handling mode.
This histogram supports the min/max/quantiles post-aggregators but does not support the bucketing post-aggregators.
### When to use
The accuracy/usefulness of the fixed buckets histogram is extremely data-dependent; it is provided to support special use cases where the user has a great deal of prior information about the data being aggregated and knows that a fixed buckets implementation is suitable.
For general histogram and quantile use cases, the [DataSketches Quantiles Sketch](../extensions-core/datasketches-quantiles.html) extension is recommended.
### Properties
|Property |Description |Default |
|-------------------------|------------------------------|----------------------------------|
|`type`|Type of the aggregator. Must `fixedBucketsHistogram`.|No default, must be specified|
|`name`|Column name for the aggregator.|No default, must be specified|
|`fieldName`|Column name of the input to the aggregator.|No default, must be specified|
|`lowerLimit`|Lower limit of the histogram. |No default, must be specified|
|`upperLimit`|Upper limit of the histogram. |No default, must be specified|
|`numBuckets`|Number of buckets for the histogram. The range [lowerLimit, upperLimit] will be divided into `numBuckets` intervals of equal size.|10|
|`outlierHandlingMode`|Specifies how values outside of [lowerLimit, upperLimit] will be handled. Supported modes are "ignore", "overflow", and "clip". See [outlier handling modes](#outlier-handling-modes) for more details.|No default, must be specified|
An example aggregator spec is shown below:
```json
{
"type" : "fixedBucketsHistogram",
"name" : <output_name>,
"fieldName" : <metric_name>,
"numBuckets" : <integer>,
"lowerLimit" : <double>,
"upperLimit" : <double>,
"outlierHandlingMode": <mode>
}
```
### Outlier handling modes
The outlier handling mode specifies what should be done with values outside of the histogram's range. There are three supported modes:
- `ignore`: Throw away outlier values.
- `overflow`: A count of outlier values will be tracked by the histogram, available in the `lowerOutlierCount` and `upperOutlierCount` fields.
- `clip`: Outlier values will be clipped to the `lowerLimit` or the `upperLimit` and included in the histogram.
If you don't care about outliers, `ignore` is the cheapest option performance-wise. There is currently no difference in storage size among the modes.
### Output fields
The histogram aggregator's output object has the following fields:
- `lowerLimit`: Lower limit of the histogram
- `upperLimit`: Upper limit of the histogram
- `numBuckets`: Number of histogram buckets
- `outlierHandlingMode`: Outlier handling mode
- `count`: Total number of values contained in the histgram, excluding outliers
- `lowerOutlierCount`: Count of outlier values below `lowerLimit`. Only used if the outlier mode is `overflow`.
- `upperOutlierCount`: Count of outlier values above `upperLimit`. Only used if the outlier mode is `overflow`.
- `missingValueCount`: Count of null values seen by the histogram.
- `max`: Max value seen by the histogram. This does not include outlier values.
- `min`: Min value seen by the histogram. This does not include outlier values.
- `histogram`: An array of longs with size `numBuckets`, containing the bucket counts
### Ingesting existing histograms
It is also possible to ingest existing fixed buckets histograms. The input must be a Base64 string encoding a byte array that contains a serialized histogram object. Both "full" and "sparse" formats can be used. Please see [Serialization formats](#serialization-formats) below for details.
### Serialization formats
#### Full serialization format
This format includes the full histogram bucket count array in the serialization format.
```
byte: serialization version, must be 0x01
byte: encoding mode, 0x01 for full
double: lowerLimit
double: upperLimit
int: numBuckets
byte: outlier handling mode (0x00 for `ignore`, 0x01 for `overflow`, and 0x02 for `clip`)
long: count, total number of values contained in the histogram, excluding outliers
long: lowerOutlierCount
long: upperOutlierCount
long: missingValueCount
double: max
double: min
array of longs: bucket counts for the histogram
```
#### Sparse serialization format
This format represents the histogram bucket counts as (bucketNum, count) pairs. This serialization format is used when less than half of the histogram's buckets have values.
```
byte: serialization version, must be 0x01
byte: encoding mode, 0x02 for sparse
double: lowerLimit
double: upperLimit
int: numBuckets
byte: outlier handling mode (0x00 for `ignore`, 0x01 for `overflow`, and 0x02 for `clip`)
long: count, total number of values contained in the histogram, excluding outliers
long: lowerOutlierCount
long: upperOutlierCount
long: missingValueCount
double: max
double: min
int: number of following (bucketNum, count) pairs
sequence of (int, long) pairs:
int: bucket number
count: bucket count
```
### Combining histograms with different bucketing schemes
It is possible to combine two histograms with different bucketing schemes (lowerLimit, upperLimit, numBuckets) together.
The bucketing scheme of the "left hand" histogram will be preserved (i.e., when running a query, the bucketing schemes specified in the query's histogram aggregators will be preserved).
When merging, we assume that values are evenly distributed within the buckets of the "right hand" histogram.
When the right-hand histogram contains outliers (when using `overflow` mode), we assume that all of the outliers counted in the right-hand histogram will be outliers in the left-hand histogram as well.
For performance and accuracy reasons, we recommend avoiding aggregation of histograms with different bucketing schemes if possible.
### Null handling
If `druid.generic.useDefaultValueForNull` is false, null values will be tracked in the `missingValueCount` field of the histogram.
If `druid.generic.useDefaultValueForNull` is true, null values will be added to the histogram as the default 0.0 value.
## Histogram post-aggregators
Post-aggregators are used to transform opaque approximate histogram sketches
into bucketed histogram representations, as well as to compute various
distribution metrics such as quantiles, min, and max.
#### Equal buckets post-aggregator
### Equal buckets post-aggregator
Computes a visual representation of the approximate histogram with a given number of equal-sized bins.
Bucket intervals are based on the range of the underlying data.
Bucket intervals are based on the range of the underlying data. This aggregator is not supported for the fixed buckets histogram.
```json
{
@ -113,7 +248,7 @@ Bucket intervals are based on the range of the underlying data.
}
```
#### Buckets post-aggregator
### Buckets post-aggregator
Computes a visual representation given an initial breakpoint, offset, and a bucket size.
@ -121,6 +256,8 @@ Bucket size determines the width of the binning interval.
Offset determines the value on which those interval bins align.
This aggregator is not supported for the fixed buckets histogram.
```json
{
"type": "buckets",
@ -131,26 +268,28 @@ Offset determines the value on which those interval bins align.
}
```
#### Custom buckets post-aggregator
### Custom buckets post-aggregator
Computes a visual representation of the approximate histogram with bins laid out according to the given breaks.
This aggregator is not supported for the fixed buckets histogram.
```json
{ "type" : "customBuckets", "name" : <output_name>, "fieldName" : <aggregator_name>,
"breaks" : [ <value>, <value>, ... ] }
```
#### min post-aggregator
### min post-aggregator
Returns the minimum value of the underlying approximate histogram aggregator
Returns the minimum value of the underlying approximate or fixed buckets histogram aggregator
```json
{ "type" : "min", "name" : <output_name>, "fieldName" : <aggregator_name> }
```
#### max post-aggregator
### max post-aggregator
Returns the maximum value of the underlying approximate histogram aggregator
Returns the maximum value of the underlying approximate or fixed buckets histogram aggregator
```json
{ "type" : "max", "name" : <output_name>, "fieldName" : <aggregator_name> }
@ -158,7 +297,7 @@ Returns the maximum value of the underlying approximate histogram aggregator
#### quantile post-aggregator
Computes a single quantile based on the underlying approximate histogram aggregator
Computes a single quantile based on the underlying approximate or fixed buckets histogram aggregator
```json
{ "type" : "quantile", "name" : <output_name>, "fieldName" : <aggregator_name>,
@ -167,7 +306,7 @@ Computes a single quantile based on the underlying approximate histogram aggrega
#### quantiles post-aggregator
Computes an array of quantiles based on the underlying approximate histogram aggregator
Computes an array of quantiles based on the underlying approximate or fixed buckets histogram aggregator
```json
{ "type" : "quantiles", "name" : <output_name>, "fieldName" : <aggregator_name>,

View File

@ -133,4 +133,20 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -87,4 +87,21 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -47,7 +47,8 @@ public class ApproximateHistogramDruidModule implements DruidModule
QuantilesPostAggregator.class,
QuantilePostAggregator.class,
MinPostAggregator.class,
MaxPostAggregator.class
MaxPostAggregator.class,
FixedBucketsHistogramAggregatorFactory.class
)
);
}
@ -59,6 +60,10 @@ public class ApproximateHistogramDruidModule implements DruidModule
ComplexMetrics.registerSerde("approximateHistogram", new ApproximateHistogramFoldingSerde());
}
if (ComplexMetrics.getSerdeForType(FixedBucketsHistogramAggregator.TYPE_NAME) == null) {
ComplexMetrics.registerSerde(FixedBucketsHistogramAggregator.TYPE_NAME, new FixedBucketsHistogramSerde());
}
if (binder != null) {
// Binder is null in some tests.
SqlBindings.addAggregator(binder, QuantileSqlAggregator.class);

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.google.common.primitives.Longs;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
import java.util.Comparator;
public class FixedBucketsHistogramAggregator implements Aggregator
{
private static final Logger LOG = new Logger(FixedBucketsHistogramAggregator.class);
public static final String TYPE_NAME = "fixedBucketsHistogram";
public static final Comparator COMPARATOR = new Comparator()
{
@Override
public int compare(Object o, Object o1)
{
return Longs.compare(((FixedBucketsHistogram) o).getCount(), ((FixedBucketsHistogram) o1).getCount());
}
};
private final BaseObjectColumnValueSelector selector;
private FixedBucketsHistogram histogram;
public FixedBucketsHistogramAggregator(
BaseObjectColumnValueSelector selector,
double lowerLimit,
double upperLimit,
int numBuckets,
FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
)
{
this.selector = selector;
this.histogram = new FixedBucketsHistogram(
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
}
@Override
public void aggregate()
{
Object val = selector.getObject();
if (val == null) {
if (NullHandling.replaceWithDefault()) {
histogram.add(NullHandling.defaultDoubleValue());
} else {
histogram.incrementMissing();
}
} else if (val instanceof String) {
LOG.info((String) val);
histogram.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
} else if (val instanceof FixedBucketsHistogram) {
histogram.combineHistogram((FixedBucketsHistogram) val);
} else if (val instanceof Number) {
histogram.add(((Number) val).doubleValue());
} else {
throw new ISE("Unknown class for object: " + val.getClass());
}
}
@Nullable
@Override
public Object get()
{
return histogram;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("FixedBucketsHistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("FixedBucketsHistogramAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("FixedBucketsHistogramAggregator does not support getDouble()");
}
@Override
public boolean isNull()
{
return false;
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,334 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.commons.codec.binary.Base64;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@JsonTypeName(FixedBucketsHistogramAggregator.TYPE_NAME)
public class FixedBucketsHistogramAggregatorFactory extends AggregatorFactory
{
private static int DEFAULT_NUM_BUCKETS = 10;
private final String name;
private final String fieldName;
private double lowerLimit;
private double upperLimit;
private int numBuckets;
private FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode;
@JsonCreator
public FixedBucketsHistogramAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@Nullable @JsonProperty("numBuckets") Integer numBuckets,
@JsonProperty("lowerLimit") double lowerLimit,
@JsonProperty("upperLimit") double upperLimit,
@JsonProperty("outlierHandlingMode") FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
)
{
this.name = name;
this.fieldName = fieldName;
this.numBuckets = numBuckets == null ? DEFAULT_NUM_BUCKETS : numBuckets;
this.lowerLimit = lowerLimit;
this.upperLimit = upperLimit;
this.outlierHandlingMode = outlierHandlingMode;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new FixedBucketsHistogramAggregator(
metricFactory.makeColumnValueSelector(fieldName),
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FixedBucketsHistogramBufferAggregator(
metricFactory.makeColumnValueSelector(fieldName),
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
}
@Override
public Comparator getComparator()
{
return FixedBucketsHistogramAggregator.COMPARATOR;
}
@Nullable
@Override
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs == null) {
if (rhs == null) {
return null;
} else {
return rhs;
}
} else {
((FixedBucketsHistogram) lhs).combineHistogram((FixedBucketsHistogram) rhs);
return lhs;
}
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
return new ObjectAggregateCombiner()
{
private final FixedBucketsHistogram combined = new FixedBucketsHistogram(
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
@Override
public void reset(ColumnValueSelector selector)
{
FixedBucketsHistogram first = (FixedBucketsHistogram) selector.getObject();
combined.combineHistogram(first);
}
@Override
public void fold(ColumnValueSelector selector)
{
FixedBucketsHistogram other = (FixedBucketsHistogram) selector.getObject();
combined.combineHistogram(other);
}
@Override
public Class<FixedBucketsHistogram> classOfObject()
{
return FixedBucketsHistogram.class;
}
@Nullable
@Override
public FixedBucketsHistogram getObject()
{
return combined;
}
};
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FixedBucketsHistogramAggregatorFactory(
name,
name,
numBuckets,
lowerLimit,
upperLimit,
outlierHandlingMode
);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
return new FixedBucketsHistogramAggregatorFactory(
name,
name,
numBuckets,
lowerLimit,
upperLimit,
outlierHandlingMode
);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(
new FixedBucketsHistogramAggregatorFactory(
fieldName,
fieldName,
numBuckets,
lowerLimit,
upperLimit,
outlierHandlingMode
)
);
}
@Override
public Object deserialize(Object object)
{
if (object instanceof String) {
byte[] bytes = Base64.decodeBase64(StringUtils.toUtf8((String) object));
final FixedBucketsHistogram fbh = FixedBucketsHistogram.fromBytes(bytes);
return fbh;
} else {
return object;
}
}
@Nullable
@Override
public Object finalizeComputation(@Nullable Object object)
{
return object;
}
@JsonProperty
@Override
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
public String getTypeName()
{
return FixedBucketsHistogramAggregator.TYPE_NAME;
}
@Override
public int getMaxIntermediateSize()
{
return FixedBucketsHistogram.SERDE_HEADER_SIZE + FixedBucketsHistogram.getFullStorageSize(numBuckets);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length + Integer.BYTES * 2 + Double.BYTES * 2)
.put(AggregatorUtil.FIXED_BUCKET_HIST_CACHE_TYPE_ID)
.put(fieldNameBytes)
.putInt(outlierHandlingMode.ordinal())
.putInt(numBuckets)
.putDouble(lowerLimit)
.putDouble(upperLimit).array();
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public double getLowerLimit()
{
return lowerLimit;
}
@JsonProperty
public double getUpperLimit()
{
return upperLimit;
}
@JsonProperty
public int getNumBuckets()
{
return numBuckets;
}
@JsonProperty
public FixedBucketsHistogram.OutlierHandlingMode getOutlierHandlingMode()
{
return outlierHandlingMode;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FixedBucketsHistogramAggregatorFactory that = (FixedBucketsHistogramAggregatorFactory) o;
return Double.compare(that.getLowerLimit(), getLowerLimit()) == 0 &&
Double.compare(that.getUpperLimit(), getUpperLimit()) == 0 &&
getNumBuckets() == that.getNumBuckets() &&
Objects.equals(getName(), that.getName()) &&
Objects.equals(getFieldName(), that.getFieldName()) &&
getOutlierHandlingMode() == that.getOutlierHandlingMode();
}
@Override
public int hashCode()
{
return Objects.hash(
getName(),
getFieldName(),
getLowerLimit(),
getUpperLimit(),
getNumBuckets(),
getOutlierHandlingMode()
);
}
@Override
public String toString()
{
return "FixedBucketsHistogramAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
", lowerLimit=" + lowerLimit +
", upperLimit=" + upperLimit +
", numBuckets=" + numBuckets +
", outlierHandlingMode=" + outlierHandlingMode +
'}';
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import java.nio.ByteBuffer;
public class FixedBucketsHistogramBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector selector;
private FixedBucketsHistogram histogram;
public FixedBucketsHistogramBufferAggregator(
BaseObjectColumnValueSelector selector,
double lowerLimit,
double upperLimit,
int numBuckets,
FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode
)
{
this.selector = selector;
this.histogram = new FixedBucketsHistogram(
lowerLimit,
upperLimit,
numBuckets,
outlierHandlingMode
);
}
@Override
public void init(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
mutationBuffer.put(histogram.toBytesFull(false));
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
FixedBucketsHistogram h0 = FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
Object val = selector.getObject();
if (val == null) {
if (NullHandling.replaceWithDefault()) {
h0.incrementMissing();
} else {
h0.add(NullHandling.defaultDoubleValue());
}
} else if (val instanceof String) {
h0.combineHistogram(FixedBucketsHistogram.fromBase64((String) val));
} else if (val instanceof FixedBucketsHistogram) {
h0.combineHistogram((FixedBucketsHistogram) val);
} else {
Double x = ((Number) val).doubleValue();
h0.add(x);
}
mutationBuffer.position(position);
mutationBuffer.put(h0.toBytesFull(false));
}
@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
return FixedBucketsHistogram.fromByteBufferFullNoSerdeHeader(mutationBuffer);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("FixedBucketsHistogramBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("FixedBucketsHistogramBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("FixedBucketsHistogramBufferAggregator does not support getDouble()");
}
@Override
public void close()
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.Ordering;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class FixedBucketsHistogramSerde extends ComplexMetricSerde
{
private static final Logger LOG = new Logger(FixedBucketsHistogramSerde.class);
private static Ordering<FixedBucketsHistogram> comparator = new Ordering<FixedBucketsHistogram>()
{
@Override
public int compare(
FixedBucketsHistogram arg1,
FixedBucketsHistogram arg2
)
{
return FixedBucketsHistogramAggregator.COMPARATOR.compare(arg1, arg2);
}
}.nullsFirst();
@Override
public String getTypeName()
{
return FixedBucketsHistogramAggregator.TYPE_NAME;
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<FixedBucketsHistogram> extractedClass()
{
return FixedBucketsHistogram.class;
}
@Nullable
@Override
public Object extractValue(InputRow inputRow, String metricName)
{
throw new UnsupportedOperationException("extractValue without an aggregator factory is not supported.");
}
@Override
public FixedBucketsHistogram extractValue(InputRow inputRow, String metricName, AggregatorFactory agg)
{
Object rawValue = inputRow.getRaw(metricName);
FixedBucketsHistogramAggregatorFactory aggregatorFactory = (FixedBucketsHistogramAggregatorFactory) agg;
if (rawValue == null) {
FixedBucketsHistogram fbh = new FixedBucketsHistogram(
aggregatorFactory.getLowerLimit(),
aggregatorFactory.getUpperLimit(),
aggregatorFactory.getNumBuckets(),
aggregatorFactory.getOutlierHandlingMode()
);
if (NullHandling.replaceWithDefault()) {
fbh.add(NullHandling.defaultDoubleValue());
} else {
fbh.incrementMissing();
}
return fbh;
} else if (rawValue instanceof Number) {
FixedBucketsHistogram fbh = new FixedBucketsHistogram(
aggregatorFactory.getLowerLimit(),
aggregatorFactory.getUpperLimit(),
aggregatorFactory.getNumBuckets(),
aggregatorFactory.getOutlierHandlingMode()
);
fbh.add(((Number) rawValue).doubleValue());
return fbh;
} else if (rawValue instanceof FixedBucketsHistogram) {
return (FixedBucketsHistogram) rawValue;
} else if (rawValue instanceof String) {
Number numberAttempt;
try {
numberAttempt = Rows.objectToNumber(metricName, rawValue);
FixedBucketsHistogram fbh = new FixedBucketsHistogram(
aggregatorFactory.getLowerLimit(),
aggregatorFactory.getUpperLimit(),
aggregatorFactory.getNumBuckets(),
aggregatorFactory.getOutlierHandlingMode()
);
fbh.add(numberAttempt.doubleValue());
return fbh;
}
catch (ParseException pe) {
FixedBucketsHistogram fbh = FixedBucketsHistogram.fromBase64((String) rawValue);
return fbh;
}
} else {
throw new UnsupportedOperationException("Unknown type: " + rawValue.getClass());
}
}
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy getObjectStrategy()
{
return new ObjectStrategy<FixedBucketsHistogram>()
{
@Override
public Class<? extends FixedBucketsHistogram> getClazz()
{
return FixedBucketsHistogram.class;
}
@Override
public FixedBucketsHistogram fromByteBuffer(ByteBuffer buffer, int numBytes)
{
buffer.limit(buffer.position() + numBytes);
FixedBucketsHistogram fbh = FixedBucketsHistogram.fromByteBuffer(buffer);
return fbh;
}
@Override
public byte[] toBytes(FixedBucketsHistogram h)
{
if (h == null) {
return new byte[]{};
}
return h.toBytes();
}
@Override
public int compare(FixedBucketsHistogram o1, FixedBucketsHistogram o2)
{
return comparator.compare(o1, o2);
}
};
}
@Override
public GenericColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column
)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -68,8 +68,14 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
@Override
public Object compute(Map<String, Object> values)
{
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
Object val = values.get(fieldName);
if (val instanceof ApproximateHistogram) {
final ApproximateHistogram ah = (ApproximateHistogram) val;
return ah.getMax();
} else {
final FixedBucketsHistogram fbh = (FixedBucketsHistogram) val;
return fbh.getMax();
}
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
@ -68,8 +69,15 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
@Override
public Object compute(Map<String, Object> values)
{
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
Object val = values.get(fieldName);
if (val instanceof ApproximateHistogram) {
final ApproximateHistogram ah = (ApproximateHistogram) val;
return ah.getMin();
} else if (val instanceof FixedBucketsHistogram) {
final FixedBucketsHistogram fbh = (FixedBucketsHistogram) val;
return fbh.getMin();
}
throw new ISE("Unknown value type: " + val.getClass());
}
@Override

View File

@ -79,8 +79,15 @@ public class QuantilePostAggregator extends ApproximateHistogramPostAggregator
@Override
public Object compute(Map<String, Object> values)
{
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
Object value = values.get(fieldName);
if (value instanceof ApproximateHistogram) {
final ApproximateHistogram ah = (ApproximateHistogram) value;
return ah.getQuantiles(new float[]{probability})[0];
} else {
final FixedBucketsHistogram fbh = (FixedBucketsHistogram) value;
float x = fbh.percentilesFloat(new double[]{probability * 100.0})[0];
return x;
}
}
@Override

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
@ -71,9 +72,24 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
@Override
public Object compute(Map<String, Object> values)
{
final ApproximateHistogram ah = (ApproximateHistogram) values.get(fieldName);
Object val = values.get(fieldName);
if (val instanceof ApproximateHistogram) {
final ApproximateHistogram ah = (ApproximateHistogram) val;
return new Quantiles(probabilities, ah.getQuantiles(probabilities), ah.getMin(), ah.getMax());
} else if (val instanceof FixedBucketsHistogram) {
final FixedBucketsHistogram fbh = (FixedBucketsHistogram) val;
double[] adjustedProbabilites = new double[probabilities.length];
for (int i = 0; i < probabilities.length; i++) {
adjustedProbabilites[i] = probabilities[i] * 100.0;
}
return new Quantiles(
probabilities,
fbh.percentilesFloat(adjustedProbabilites),
(float) fbh.getMin(),
(float) fbh.getMax()
);
}
throw new ISE("Unknown value type: " + val.getClass());
}
@Override
@ -91,7 +107,7 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
@Override
public String toString()
{
return "EqualBucketsPostAggregator{" +
return "QuantilesPostAggregator{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
", probabilities=" + Arrays.toString(this.getProbabilities()) +

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
*/
@RunWith(Parameterized.class)
public class FixedBucketsHistogramAggregationTest
{
private AggregationTestHelper helper;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
public FixedBucketsHistogramAggregationTest(final GroupByQueryConfig config)
{
ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule();
module.configure(null);
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Lists.newArrayList(module.getJacksonModules()),
config,
tempFolder
);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[]{config});
}
return constructors;
}
@After
public void teardown() throws IOException
{
helper.close();
}
@Test
public void testIngestWithNullsIgnoredAndQuery() throws Exception
{
MapBasedRow row = ingestAndQuery();
if (!NullHandling.replaceWithDefault()) {
Assert.assertEquals(92.782760, row.getMetric("index_min").floatValue(), 0.0001);
Assert.assertEquals(135.109191, row.getMetric("index_max").floatValue(), 0.0001);
Assert.assertEquals(135.9499969482422, row.getMetric("index_quantile").floatValue(), 0.0001);
} else {
Assert.assertEquals(0.0, row.getMetric("index_min"));
Assert.assertEquals(135.109191, row.getMetric("index_max").floatValue(), 0.0001);
Assert.assertEquals(135.8699951171875, row.getMetric("index_quantile").floatValue(), 0.0001);
}
}
private MapBasedRow ingestAndQuery() throws Exception
{
String ingestionAgg = FixedBucketsHistogramAggregator.TYPE_NAME;
String metricSpec = "[{"
+ "\"type\": \"" + ingestionAgg + "\","
+ "\"name\": \"index_fbh\","
+ "\"numBuckets\": 200,"
+ "\"lowerLimit\": 0,"
+ "\"upperLimit\": 200,"
+ "\"outlierHandlingMode\": \"overflow\","
+ "\"fieldName\": \"index\""
+ "}]";
String parseSpec = "{"
+ "\"type\" : \"string\","
+ "\"parseSpec\" : {"
+ " \"format\" : \"tsv\","
+ " \"timestampSpec\" : {"
+ " \"column\" : \"timestamp\","
+ " \"format\" : \"auto\""
+ "},"
+ " \"dimensionsSpec\" : {"
+ " \"dimensions\": [],"
+ " \"dimensionExclusions\" : [],"
+ " \"spatialDimensions\" : []"
+ " },"
+ " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
+ " }"
+ "}";
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": \"test_datasource\","
+ "\"granularity\": \"ALL\","
+ "\"dimensions\": [],"
+ "\"aggregations\": ["
+ " {"
+ " \"type\": \"fixedBucketsHistogram\","
+ " \"name\": \"index_fbh\","
+ " \"fieldName\": \"index_fbh\","
+ " \"numBuckets\": 200,"
+ " \"lowerLimit\": 0,"
+ " \"upperLimit\": 200,"
+ " \"outlierHandlingMode\": \"overflow\""
+ " }"
+ "],"
+ "\"postAggregations\": ["
+ " { \"type\": \"min\", \"name\": \"index_min\", \"fieldName\": \"index_fbh\"},"
+ " { \"type\": \"max\", \"name\": \"index_max\", \"fieldName\": \"index_fbh\"},"
+ " { \"type\": \"quantile\", \"name\": \"index_quantile\", \"fieldName\": \"index_fbh\", \"probability\" : 0.99 }"
+ "],"
+ "\"intervals\": [ \"1970/2050\" ]"
+ "}";
Sequence seq = helper.createIndexAndRunQueryOnSegment(
this.getClass().getClassLoader().getResourceAsStream("sample.data.tsv"),
parseSpec,
metricSpec,
0,
Granularities.NONE,
50000,
query
);
return (MapBasedRow) seq.toList().get(0);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestFloatColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
public class FixedBucketsHistogramBufferAggregatorTest
{
private void aggregateBuffer(TestFloatColumnSelector selector, BufferAggregator agg, ByteBuffer buf, int position)
{
agg.aggregate(buf, position);
selector.increment();
}
@Test
public void testBufferAggregate()
{
final float[] values = {23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FixedBucketsHistogramAggregatorFactory factory = new FixedBucketsHistogramAggregatorFactory(
"billy",
"billy",
5,
0,
50,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
FixedBucketsHistogramBufferAggregator agg = new FixedBucketsHistogramBufferAggregator(
selector,
0,
50,
5,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < values.length; i++) {
aggregateBuffer(selector, agg, buf, position);
}
FixedBucketsHistogram h = ((FixedBucketsHistogram) agg.get(buf, position));
Assert.assertArrayEquals(
"final bin counts don't match expected counts",
new long[]{2, 3, 1, 3, 1}, h.getHistogram()
);
Assert.assertEquals("getMin value doesn't match expected getMin", 2, h.getMin(), 0);
Assert.assertEquals("getMax value doesn't match expected getMax", 45, h.getMax(), 0);
Assert.assertEquals("count doesn't match expected count", 10, h.getCount());
}
}

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.TestHelper;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*/
@RunWith(Parameterized.class)
public class FixedBucketsHistogramGroupByQueryTest
{
private static final Closer resourceCloser = Closer.create();
private final QueryRunner<Row> runner;
private final GroupByQueryRunnerFactory factory;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
final GroupByQueryConfig v1Config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V1;
}
@Override
public String toString()
{
return "v1";
}
};
final GroupByQueryConfig v1SingleThreadedConfig = new GroupByQueryConfig()
{
@Override
public boolean isSingleThreaded()
{
return true;
}
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V1;
}
@Override
public String toString()
{
return "v1SingleThreaded";
}
};
final GroupByQueryConfig v2Config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
@Override
public String toString()
{
return "v2";
}
};
v1Config.setMaxIntermediateRows(10000);
v1SingleThreadedConfig.setMaxIntermediateRows(10000);
final List<Object[]> constructors = new ArrayList<>();
final List<GroupByQueryConfig> configs = ImmutableList.of(
v1Config,
v1SingleThreadedConfig,
v2Config
);
for (GroupByQueryConfig config : configs) {
final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = GroupByQueryRunnerTest.makeQueryRunnerFactory(
config
);
final GroupByQueryRunnerFactory factory = factoryAndCloser.lhs;
resourceCloser.register(factoryAndCloser.rhs);
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",
config.toString(),
runner.toString()
);
constructors.add(new Object[]{testName, factory, runner});
}
}
return constructors;
}
public FixedBucketsHistogramGroupByQueryTest(
String testName,
GroupByQueryRunnerFactory factory,
QueryRunner runner
)
{
this.factory = factory;
this.runner = runner;
//Note: this is needed in order to properly register the serde for Histogram.
new ApproximateHistogramDruidModule().configure(null);
}
@After
public void teardown() throws IOException
{
resourceCloser.close();
}
@Test
public void testGroupByWithFixedHistogramAgg()
{
FixedBucketsHistogramAggregatorFactory aggFactory = new FixedBucketsHistogramAggregatorFactory(
"histo",
"index",
10,
0,
2000,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran).setDimensions(new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
"marketalias"
))
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(new OrderByColumnSpec("marketalias", OrderByColumnSpec.Direction.DESCENDING)),
1
)
).setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, aggFactory)
.setPostAggregatorSpecs(
Collections.singletonList(
new QuantilePostAggregator("quantile", "histo", 0.5f)
)
)
.build();
List<Row> expectedResults = Collections.singletonList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"marketalias", "upfront",
"rows", 186L,
"quantile", 969.6969604492188f,
"histo",
new FixedBucketsHistogram(
0,
2000,
10,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new long[]{0, 0, 4, 33, 66, 35, 25, 11, 10, 2},
186,
1870.061029,
545.990623,
0,
0,
0
)
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
@Test(expected = IllegalArgumentException.class)
public void testGroupByWithSameNameComplexPostAgg()
{
FixedBucketsHistogramAggregatorFactory aggFactory = new FixedBucketsHistogramAggregatorFactory(
"histo",
"index",
10,
0,
2000,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran).setDimensions(new DefaultDimensionSpec(
QueryRunnerTestHelper.marketDimension,
"marketalias"
))
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(new OrderByColumnSpec("marketalias", OrderByColumnSpec.Direction.DESCENDING)),
1
)
).setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, aggFactory)
.setPostAggregatorSpecs(
Collections.singletonList(
new QuantilePostAggregator("quantile", "quantile", 0.5f)
)
)
.build();
List<Row> expectedResults = Collections.singletonList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"marketalias", "upfront",
"rows", 186L,
"quantile", 969.6969604492188f
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
}

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class FixedBucketsHistogramTopNQueryTest
{
private static final Closer resourceCloser = Closer.create();
@AfterClass
public static void teardown() throws IOException
{
resourceCloser.close();
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(2000)
);
resourceCloser.register(defaultPool);
resourceCloser.register(customPool);
return QueryRunnerTestHelper.transformToConstructionFeeder(
Iterables.concat(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
)
);
}
private final QueryRunner runner;
public FixedBucketsHistogramTopNQueryTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Test
public void testTopNWithFixedHistogramAgg()
{
FixedBucketsHistogramAggregatorFactory factory = new FixedBucketsHistogramAggregatorFactory(
"histo",
"index",
10,
0,
2000,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW
);
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),
factory
)
)
)
)
.postAggregators(
Arrays.asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg,
new QuantilePostAggregator("quantile", "histo", 0.5f)
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 216053.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.put("quantile", 1135.238f)
.put(
"histo",
new FixedBucketsHistogram(
0,
2000,
10,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new long[]{0, 0, 0, 1, 21, 105, 42, 12, 5, 0},
186,
1743.92175,
792.326066,
0,
0,
0
)
)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 192420.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.put("quantile", 969.69696f)
.put(
"histo",
new FixedBucketsHistogram(
0,
2000,
10,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new long[]{0, 0, 4, 33, 66, 35, 25, 11, 10, 2},
186,
1870.061029,
545.990623,
0,
0,
0
)
)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.put("quantile", 100.23952f)
.put(
"histo",
new FixedBucketsHistogram(
0,
2000,
10,
FixedBucketsHistogram.OutlierHandlingMode.OVERFLOW,
new long[]{835, 2, 0, 0, 0, 0, 0, 0, 0, 0},
837,
277.273533,
59.021022,
0,
0,
0
)
)
.build()
)
)
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
List<Result<TopNResultValue>> results = runner.run(QueryPlus.wrap(query), context).toList();
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -110,6 +110,9 @@ public class AggregatorUtil
public static final byte HLL_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x31;
public static final byte HLL_SKETCH_TO_ESTIMATE_AND_BOUNDS_CACHE_TYPE_ID = 0x32;
// Fixed buckets histogram aggregator
public static final byte FIXED_BUCKET_HIST_CACHE_TYPE_ID = 0x33;
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*

View File

@ -194,7 +194,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
@Override
public Object getObject()
{
return extractor.extractValue(in.get(), column);
return extractor.extractValue(in.get(), column, agg);
}
@Override

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.serde;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
@ -33,4 +34,10 @@ public interface ComplexMetricExtractor<T>
@Nullable
T extractValue(InputRow inputRow, String metricName);
@Nullable
default T extractValue(InputRow inputRow, String metricName, AggregatorFactory agg)
{
return extractValue(inputRow, metricName);
}
}

View File

@ -116,5 +116,4 @@ public class LargeColumnSupportedComplexColumnSerializer<T> implements GenericCo
{
writer.writeTo(channel, smoosher);
}
}