New: Add DDSketch in extensions-contrib (#15049)

* New: Add DDSketch-Druid extension

- Based off of http://www.vldb.org/pvldb/vol12/p2195-masson.pdf and uses
 the corresponding https://github.com/DataDog/sketches-java library
- contains tests for post building and using aggregation/post
  aggregation.
- New aggregator: `ddSketch`
- New post aggregators: `quantileFromDDSketch` and
  `quantilesFromDDSketch`

* Fixing easy CodeQL warnings/errors

* Fixing docs, and dependencies

Also moved aggregator ids to AggregatorUtil and PostAggregatorIds

* Adding more Docs and better null/empty handling for aggregators

* Fixing docs, and pom version

* DDSketch documentation format and wording
This commit is contained in:
Hiroshi Fukada 2024-01-23 07:47:07 -07:00 committed by GitHub
parent bf7d8680b3
commit 3fe3a65344
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 2489 additions and 0 deletions

View File

@ -443,6 +443,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-tdigestsketch</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-ddsketch</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:gce-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:aliyun-oss-extensions</argument>

View File

@ -83,6 +83,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.md)|
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.md)|
|druid-compressed-bigdecimal|Compressed Big Decimal Type | [link](../development/extensions-contrib/compressed-big-decimal.md)|
|druid-ddsketch|Support for DDSketch approximate quantiles based on [DDSketch](https://github.com/datadog/sketches-java) | [link](../development/extensions-contrib/ddsketch-quantiles.md)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.md)|
|druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.md)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.md)|

View File

@ -0,0 +1,139 @@
---
id: ddsketch-quantiles
title: "DDSketches for Approximate Quantiles module"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
This module provides aggregators for approximate quantile queries using the [DDSketch](https://github.com/datadog/sketches-java) library. The DDSketch library provides a fast, and fully-mergeable quantile sketch with relative error. If the true quantile is 100, a sketch with relative error of 1% guarantees a quantile value between 101 and 99. This is important and highly valuable behavior for long tail distributions. The best use case for these sketches is for accurately describing the upper quantiles of long tailed distributions such as network latencies.
To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) in the extensions load list.
```
druid.extensions.loadList=["druid-ddsketch", ...]
```
### Aggregator
The result of the aggregation is a DDSketch that is the union of all sketches either built from raw data or read from the segments. The single number that is returned represents the total number of included data points. The default aggregator type of `ddSketch` uses the collapsingLowestDense strategy for storing and merging sketch. This means that in favor of keeping the highest values represented at the highest accuracy, the sketch will collapse and merge lower, smaller values in the sketch. Collapsed bins will lose accuracy guarantees. The default number of bins is 1000. Sketches can only be merged when using the same relativeError values.
The `ddSketch` aggregator operates over raw data and precomputed sketches.
```json
{
"type" : "ddSketch",
"name" : <output_name>,
"fieldName" : <input_name>,
"relativeError" : <double(0, 1)>,
"numBins": <int>
}
```
|property|description|required?|
|--------|-----------|---------|
|type|Must be "ddSketch" |yes|
|name|A String for the output (result) name of the calculation.|yes|
|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes|
|relativeError|Describes the precision in which to store the sketch. Must be a number between 0 and 1.|no, defaults to 0.01 (1% error)|
|numBins|Total number of bins the sketch is allowed to use to describe the distribution. This has a direct impact on max memory used. The more total bins available, the larger the range of accurate quantiles. With relative accuracy of 2%, only 275 bins are required to cover values between 1 millisecond and 1 minute. 800 bins are required to cover values between 1 nanosecond and 1 day.|no, defaults to 1000|
### Post Aggregators
To compute approximate quantiles, use `quantilesFromDDSketch` to query for a set of quantiles or `quantileFromDDSketch` to query for a single quantile. Call these post-aggregators on the sketches created by the `ddSketch` aggregators.
#### quantilesFromDDSketch
Use `quantilesFromDDSketch` to fetch multiple quantiles.
```json
{
"type" : "quantilesFromDDSketch",
"name" : <output_name>,
"field" : <reference to DDSketch>,
"fractions" : <array of doubles in [0,1]>
}
```
|property|description|required?|
|--------|-----------|---------|
|type|Must be "quantilesFromDDSketch" |yes|
|name|A String for the output (result) name of the calculation.|yes|
|field|A computed ddSketch.|yes|
|fractions|Array of doubles from 0 to 1 of the quantiles to compute|yes|
#### quantileFromDDSketch
Use `quantileFromDDSketch` to fetch a single quantile.
```json
{
"type" : "quantileFromDDSketch",
"name" : <output_name>,
"field" : <reference to DDsketch>,
"fraction" : <double [0,1]>
}
```
|property|description|required?|
|--------|-----------|---------|
|type|Must be "quantileFromDDSketch" |yes|
|name|A String for the output (result) name of the calculation.|yes|
|field|A computed ddSketch.|yes|
|fraction|A double from 0 to 1 of the quantile to compute|yes|
### Example
As an example of a query with sketches pre-aggregated at ingestion time, one could set up the following aggregator at ingest:
```json
{
"type": "ddSketch",
"name": "sketch",
"fieldName": "value",
"relativeError": 0.01,
"numBins": 1000,
}
```
Compute quantiles from the pre-aggregated sketches using the following aggregator and post-aggregator.
```json
{
"aggregations": [{
"type": "ddSketch",
"name": "sketch",
"fieldName": "sketch",
}],
"postAggregations": [
{
"type": "quantilesFromDDSketch",
"name": "quantiles",
"fractions": [0.5, 0.75, 0.9, 0.99],
"field": {
"type": "fieldAccess",
"fieldName": "sketch"
}
}]
}
```

View File

@ -0,0 +1,186 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>org.apache.druid</groupId>
<version>29.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-ddsketch</artifactId>
<name>druid-ddsketch</name>
<description>Druid extension for generating ddsketch backed sketches</description>
<dependencies>
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>sketches-java</artifactId>
<version>0.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.5.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<scope>provided</scope>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketches;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
/**
* Aggregator to build DDsketches on numeric values.
* It generally makes sense to use this aggregator during the ingestion time.
* <p>
* One can use this aggregator to build these sketches during query time too, just
* that it will be slower and more resource intensive.
*/
public class DDSketchAggregator implements Aggregator
{
private final ColumnValueSelector selector;
@GuardedBy("this")
private DDSketch histogram;
public DDSketchAggregator(ColumnValueSelector selector, @Nullable Double relativeError, @Nullable Integer numBins)
{
int effectiveNumBins = numBins != null ? numBins : DDSketchAggregatorFactory.DEFAULT_NUM_BINS;
double effectiveRelativeError = relativeError != null ? relativeError : DDSketchAggregatorFactory.DEFAULT_RELATIVE_ERROR;
this.selector = selector;
this.histogram = DDSketches.collapsingLowestDense(effectiveRelativeError, effectiveNumBins);
}
@Override
public void aggregate()
{
Object obj = selector.getObject();
if (obj == null) {
return;
}
synchronized (this) {
if (obj instanceof Number) {
this.histogram.accept(((Number) obj).doubleValue());
} else if (obj instanceof DDSketch) {
this.histogram.mergeWith((DDSketch) obj);
} else {
throw new IAE(
"Expected a number or an instance of DDSketch, but received [%s] of type [%s]",
obj,
obj.getClass()
);
}
}
}
@Nullable
@Override
public synchronized Object get()
{
return histogram;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("Casting to float type is not supported");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("Casting to long type is not supported");
}
@Override
public synchronized void close()
{
this.histogram = null;
}
}

View File

@ -0,0 +1,336 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketches;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.curator.shaded.com.google.common.math.IntMath;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Aggregation operations over the ddsketch quantile sketch
* available on <a href="https://github.com/DataDog/sketches-java">github</a> and described
* in the paper
* <a href="https://blog.acolyer.org/2019/09/06/ddsketch/">
* Computing relative error quantiles using ddsketch</a>.
* <p>
*/
@JsonTypeName(DDSketchAggregatorFactory.TYPE_NAME)
public class DDSketchAggregatorFactory extends AggregatorFactory
{
// Default relative error
public static final double DEFAULT_RELATIVE_ERROR = 0.01;
// Default num bins
public static final int DEFAULT_NUM_BINS = 1000;
@Nonnull
private final String name;
@Nonnull
private final String fieldName;
private final double relativeError;
private final int numBins;
private final byte cacheTypeId;
public static final String TYPE_NAME = "ddSketch";
public static final ColumnType TYPE = ColumnType.ofComplex(TYPE_NAME);
@JsonCreator
public DDSketchAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("relativeError") final Double relativeError,
@JsonProperty("numBins") final Integer numBins
)
{
this(name, fieldName, relativeError, numBins, AggregatorUtil.DDSKETCH_CACHE_TYPE_ID);
}
DDSketchAggregatorFactory(
final String name,
final String fieldName,
@Nullable final Double relativeError,
@Nullable final Integer numBins,
final byte cacheTypeId
)
{
this.name = Objects.requireNonNull(name, "Must have a valid, non-null aggregator name");
this.fieldName = Objects.requireNonNull(fieldName, "Parameter fieldName must be specified");
this.relativeError = relativeError == null ? DEFAULT_RELATIVE_ERROR : relativeError;
this.numBins = numBins == null ? DEFAULT_NUM_BINS : numBins;
this.cacheTypeId = cacheTypeId;
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(
cacheTypeId
).appendString(fieldName).appendDouble(relativeError).appendInt(numBins).build();
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DDSketchAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DDSketchBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins);
}
public static final Comparator<DDSketch> COMPARATOR = Comparator.nullsFirst(
Comparator.comparingLong(a -> a.serializedSize())
);
@Override
public Comparator getComparator()
{
return COMPARATOR;
}
@Override
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs == null) {
return rhs;
}
if (rhs == null) {
return lhs;
}
DDSketch union = (DDSketch) lhs;
union.mergeWith((DDSketch) rhs);
return union;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DDSketchAggregatorFactory(name, name, relativeError, numBins);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(
new DDSketchAggregatorFactory(
name,
fieldName,
relativeError,
numBins
)
);
}
@Override
public Object deserialize(Object serializedSketch)
{
return DDSketchUtils.deserialize(serializedSketch);
}
@Nullable
@Override
public Object finalizeComputation(@Nullable Object object)
{
return object == null ? null : ((DDSketch) object).getCount();
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public double getRelativeError()
{
return relativeError;
}
@JsonProperty
public int getNumBins()
{
return numBins;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
/**
* actual type is {@link DDSketch}
*/
@Override
public ColumnType getIntermediateType()
{
return TYPE;
}
@Override
public ColumnType getResultType()
{
return TYPE;
}
/*
* Each bounded lower collapsing store yields a max size of numBins * 8 bytes (size Of Double) in terms of size.
* Since the sketch contains a store for positive values and negative values, a fully filled sketch at maximum would contain:
* 2 * numBins * 8Bytes for storage. Other tracked members of the serialized sketch are constant,
* so we add 12 to account for these members. These members include mapping reconstruction, and zero counts.
* These are tracked through doubles and integers and do not increase in size as the sketch accepts new values and merged.
*
*/
@Override
public int getMaxIntermediateSize()
{
return IntMath.checkedMultiply(numBins, Double.BYTES * 2) // Postive + Negative Stores
+ Double.BYTES // zeroCount
+ Double.BYTES // gamma
+ Double.BYTES // indexOffset
+ Integer.BYTES // interpolationEnum
+ 12; // collective protoscope descriptor max sizes
}
@Override
public AggregatorFactory withName(String newName)
{
return new DDSketchAggregatorFactory(newName, getFieldName(), getRelativeError(), getNumBins(), cacheTypeId);
}
@Override
public AggregateCombiner<DDSketch> makeAggregateCombiner()
{
return new ObjectAggregateCombiner<DDSketch>()
{
private DDSketch combined = DDSketches.collapsingLowestDense(relativeError, numBins);
@Override
public void reset(final ColumnValueSelector selector)
{
combined.clear();
fold(selector);
}
@Override
public void fold(final ColumnValueSelector selector)
{
DDSketch other = (DDSketch) selector.getObject();
if (other == null) {
return;
}
combined.mergeWith(other);
}
@Nullable
@Override
public DDSketch getObject()
{
return combined;
}
@Override
public Class<DDSketch> classOfObject()
{
return DDSketch.class;
}
};
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || !getClass().equals(o.getClass())) {
return false;
}
final DDSketchAggregatorFactory that = (DDSketchAggregatorFactory) o;
return Objects.equals(name, that.name) &&
Objects.equals(fieldName, that.fieldName) &&
relativeError == that.relativeError &&
numBins == that.numBins;
}
@Override
public int hashCode()
{
return Objects.hash(name, fieldName, relativeError, numBins);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{"
+ "name=" + name
+ ", fieldName=" + fieldName
+ ", relativeError=" + relativeError
+ ", numBins=" + numBins
+ "}";
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketches;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
/**
* Aggregator that builds DDSketch backed sketch using numeric values read from {@link ByteBuffer}
*/
public class DDSketchBufferAggregator implements BufferAggregator
{
@Nonnull
private final ColumnValueSelector selector;
private final double relativeError;
private final int numBins;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DDSketch>> sketchCache = new IdentityHashMap();
public DDSketchBufferAggregator(
final ColumnValueSelector valueSelector,
final double relativeError,
final int numBins
)
{
Preconditions.checkNotNull(valueSelector);
this.selector = valueSelector;
this.relativeError = relativeError;
this.numBins = numBins;
}
@Override
public void init(ByteBuffer buffer, int position)
{
DDSketch sketch = DDSketches.collapsingLowestDense(relativeError, numBins);
ByteBuffer mutationBuffer = buffer.duplicate();
mutationBuffer.position(position);
addToCache(buffer, position, sketch);
}
@Override
public void aggregate(ByteBuffer buffer, int position)
{
Object x = selector.getObject();
if (x == null) {
return;
}
DDSketch sketch = sketchCache.get(buffer).get(position);
if (x instanceof Number) {
sketch.accept(((Number) x).doubleValue());
} else if (x instanceof DDSketch) {
sketch.mergeWith((DDSketch) x);
} else {
throw new IAE(
"Expected a number or an instance of DDSketch, but received [%s] of type [%s]",
x,
x.getClass()
);
}
}
@Override
public Object get(final ByteBuffer buffer, final int position)
{
// sketchCache is an IdentityHashMap where the reference of buffer is used for equality checks.
// So the returned object isn't impacted by the changes in the buffer object made by concurrent threads.
DDSketch obj = sketchCache.get(buffer).get(position);
return obj;
}
@Override
public float getFloat(final ByteBuffer buffer, final int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getLong(final ByteBuffer buffer, final int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{
sketchCache.clear();
}
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
DDSketch sketch = sketchCache.get(oldBuffer).get(oldPosition);
addToCache(newBuffer, newPosition, sketch);
final Int2ObjectMap<DDSketch> map = sketchCache.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
sketchCache.remove(oldBuffer);
}
}
private void addToCache(final ByteBuffer buffer, final int position, final DDSketch sketch)
{
Int2ObjectMap<DDSketch> map = sketchCache.computeIfAbsent(buffer, b -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class DDSketchComplexMetricSerde extends ComplexMetricSerde
{
private static final DDSketchObjectStrategy STRATEGY = new DDSketchObjectStrategy();
@Override
public String getTypeName()
{
return DDSketchAggregatorFactory.TYPE_NAME;
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<?> extractedClass()
{
return DDSketch.class;
}
@Override
public Object extractValue(final InputRow inputRow, final String metricName)
{
final Object obj = inputRow.getRaw(metricName);
if (obj == null || obj instanceof Number || obj instanceof DDSketch) {
return obj;
}
if (obj instanceof String) {
String objString = (String) obj;
if (objString.isEmpty()) {
return null;
}
try {
Double doubleValue = Double.parseDouble(objString);
return doubleValue;
}
catch (NumberFormatException e) {
throw new IAE("Expected string with a number, received value: " + objString);
}
}
return DDSketchUtils.deserialize(obj);
}
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<DDSketch> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy<DDSketch> getObjectStrategy()
{
return STRATEGY;
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
public class DDSketchJsonSerializer extends JsonSerializer<DDSketch>
{
@Override
public void serialize(
DDSketch ddSketch,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider
) throws IOException
{
jsonGenerator.writeBinary(DDSketchUtils.toBytes(ddSketch));
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import java.util.List;
/**
* Module defining aggregators for the DDsketch based sketches
*/
public class DDSketchModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(
getClass().getSimpleName()
).registerSubtypes(
new NamedType(
DDSketchAggregatorFactory.class,
DDSketchAggregatorFactory.TYPE_NAME
),
new NamedType(
DDSketchToQuantilesPostAggregator.class,
DDSketchToQuantilesPostAggregator.TYPE_NAME
),
new NamedType(
DDSketchToQuantilePostAggregator.class,
DDSketchToQuantilePostAggregator.TYPE_NAME
)
).addSerializer(DDSketch.class, new DDSketchJsonSerializer())
);
}
@Override
public void configure(Binder binder)
{
registerSerde();
}
@VisibleForTesting
public static void registerSerde()
{
ComplexMetrics.registerSerde(DDSketchAggregatorFactory.TYPE_NAME, new DDSketchComplexMetricSerde());
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding;
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.druid.segment.data.ObjectStrategy;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class DDSketchObjectStrategy implements ObjectStrategy<DDSketch>
{
private static final byte[] EMPTY_BYTES = new byte[0];
@Override
public Class<DDSketch> getClazz()
{
return DDSketch.class;
}
@Override
public DDSketch fromByteBuffer(ByteBuffer buffer, int numBytes)
{
if (numBytes == 0) {
return null;
}
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(buffer.position() + numBytes);
try {
com.datadoghq.sketch.ddsketch.proto.DDSketch proto = com.datadoghq.sketch.ddsketch.proto.DDSketch.parseFrom(readOnlyBuffer);
DDSketch recovered = DDSketchProtoBinding.fromProto(() -> new CollapsingLowestDenseStore(1000), proto);
return recovered;
}
catch (InvalidProtocolBufferException e) {
throw new UnsupportedOperationException("Unable to decode from Proto");
}
}
@Override
public byte[] toBytes(@Nullable DDSketch val)
{
if (val == null) {
return EMPTY_BYTES;
}
return DDSketchProtoBinding.toProto(val).toByteArray();
}
@Override
public int compare(DDSketch o1, DDSketch o2)
{
return DDSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Post aggregation operator that can take in aggregated ddsketches and
* generate quantiles from it.
*/
public class DDSketchToQuantilePostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
private final double fraction;
public static final String TYPE_NAME = "quantileFromDDSketch";
private static final EmittingLogger log = new EmittingLogger(DDSketchToQuantilePostAggregator.class);
@JsonCreator
public DDSketchToQuantilePostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("fraction") final double fraction
)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
this.fraction = fraction;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.DOUBLE;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
public double getFraction()
{
return fraction;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final DDSketch sketch = (DDSketch) field.compute(combinedAggregators);
if (sketch == null || sketch.getCount() == 0) {
return Double.NaN;
}
return sketch.getValueAtQuantile(fraction);
}
@Override
public Comparator<Double> getComparator()
{
return Doubles::compare;
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
", fraction=" + fraction +
"}";
}
@Override
public byte[] getCacheKey()
{
final CacheKeyBuilder builder = new CacheKeyBuilder(PostAggregatorIds.DDSKETCH_QUANTILE_TYPE_ID).appendCacheable(field);
builder.appendDouble(fraction);
return builder.build();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DDSketchToQuantilePostAggregator that = (DDSketchToQuantilePostAggregator) o;
return Double.compare(that.fraction, fraction) == 0 &&
Objects.equals(name, that.name) &&
Objects.equals(field, that.field);
}
@Override
public int hashCode()
{
return Objects.hash(name, field, fraction);
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> map)
{
return this;
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
/**
* Post aggregation operator that can take in aggregated ddsketches and
* generate quantiles from it.
*/
public class DDSketchToQuantilesPostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
private final double[] fractions;
public static final String TYPE_NAME = "quantilesFromDDSketch";
@JsonCreator
public DDSketchToQuantilesPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("fractions") final double[] fractions
)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
this.fractions = Preconditions.checkNotNull(fractions, "array of fractions is null");
Preconditions.checkArgument(this.fractions.length >= 1, "Array of fractions cannot be empty");
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.DOUBLE_ARRAY;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
public double[] getFractions()
{
return fractions;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final DDSketch sketch = (DDSketch) field.compute(combinedAggregators);
if (sketch == null || sketch.getCount() == 0) {
return Double.NaN;
}
double[] quantiles = new double[fractions.length];
int i = 0;
for (double f : fractions) {
quantiles[i++] = sketch.getValueAtQuantile(f);
}
return quantiles;
}
@Override
public Comparator<double[]> getComparator()
{
throw new IAE("Comparing arrays of quantiles is not supported");
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
", fractions=" + Arrays.toString(fractions) +
"}";
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DDSketchToQuantilesPostAggregator that = (DDSketchToQuantilesPostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
if (!Arrays.equals(fractions, that.fractions)) {
return false;
}
return field.equals(that.field);
}
@Override
public int hashCode()
{
return (name.hashCode() * 31 + field.hashCode()) * 31 + Arrays.hashCode(fractions);
}
@Override
public byte[] getCacheKey()
{
final CacheKeyBuilder builder = new CacheKeyBuilder(
PostAggregatorIds.DDSKETCH_QUANTILES_TYPE_ID).appendCacheable(field);
for (final double value : fractions) {
builder.appendDouble(value);
}
return builder.build();
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> map)
{
return this;
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding;
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
public class DDSketchUtils
{
// Class is not meant to be instantiated
private DDSketchUtils()
{
}
public static DDSketch deserialize(Object serializedSketch)
{
try {
if (serializedSketch instanceof String) {
String str = (String) serializedSketch;
byte[] bytes = StringUtils.decodeBase64(StringUtils.toUtf8(str));
com.datadoghq.sketch.ddsketch.proto.DDSketch proto = com.datadoghq.sketch.ddsketch.proto.DDSketch.parseFrom(bytes);
DDSketch recovered = DDSketchProtoBinding.fromProto(() -> new CollapsingLowestDenseStore(1000), proto);
return recovered;
} else if (serializedSketch instanceof byte[]) {
com.datadoghq.sketch.ddsketch.proto.DDSketch proto = com.datadoghq.sketch.ddsketch.proto.DDSketch.parseFrom((byte[]) serializedSketch);
DDSketch recovered = DDSketchProtoBinding.fromProto(() -> new CollapsingLowestDenseStore(1000), proto);
return recovered;
}
}
catch (InvalidProtocolBufferException e) {
throw new IAE(
"Object cannot be deserialized to a DDSketch Sketch: "
+ serializedSketch.getClass()
);
}
if (serializedSketch instanceof DDSketch) {
return (DDSketch) serializedSketch;
}
throw new IAE(
"Object cannot be deserialized to a DDSketch Sketch: "
+ serializedSketch.getClass()
);
}
static byte[] toBytes(DDSketch sketch)
{
return DDSketchProtoBinding.toProto(sketch).toByteArray();
}
public static boolean matchingAggregatorFactoryExists(
final VirtualColumnRegistry virtualColumnRegistry,
final DruidExpression input,
final double relativeError,
final int numBins,
final DDSketchAggregatorFactory factory
)
{
// Check input for equivalence.
final boolean inputMatches;
final DruidExpression virtualInput =
virtualColumnRegistry.findVirtualColumnExpressions(factory.requiredFields())
.stream()
.findFirst()
.orElse(null);
if (virtualInput == null) {
inputMatches = input.isDirectColumnAccess() && input.getDirectColumn().equals(factory.getFieldName());
} else {
inputMatches = virtualInput.equals(input);
}
return inputMatches && relativeError == factory.getRelativeError() && numBins == factory.getNumBins();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.query.aggregation.ddsketch.DDSketchModule

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
import org.junit.Test;
public class DDSketchAggregatorFactoryTest
{
@Test
public void testResultArraySignature()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2000/3000")
.granularity(Granularities.HOUR)
.aggregators(
new CountAggregatorFactory("count"),
new DDSketchAggregatorFactory("ddsketch", "col", 0.01, 1000)
)
.postAggregators(
new FieldAccessPostAggregator("ddsketch-access", "ddsketch"),
new FinalizingFieldAccessPostAggregator("ddsketch-finalize", "ddsketch")
)
.build();
Assert.assertEquals(
RowSignature.builder()
.addTimeColumn()
.add("count", ColumnType.LONG)
.add("ddsketch", DDSketchAggregatorFactory.TYPE)
.add("ddsketch-access", DDSketchAggregatorFactory.TYPE)
.add("ddsketch-finalize", DDSketchAggregatorFactory.TYPE)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}
@Test
public void testWithName()
{
DDSketchAggregatorFactory factory = new DDSketchAggregatorFactory("ddsketch", "col", 0.01, 1000);
Assert.assertEquals(factory, factory.withName("ddsketch"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());
}
}

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@RunWith(Parameterized.class)
public class DDSketchAggregatorTest extends InitializedNullHandlingTest
{
private final AggregationTestHelper helper;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
public DDSketchAggregatorTest(final GroupByQueryConfig config)
{
DDSketchModule module = new DDSketchModule();
DDSketchModule.registerSerde();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
module.getJacksonModules(), config, tempFolder);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[]{config});
}
return constructors;
}
// this is to test Json properties and equals
@Test
public void serializeDeserializeFactoryWithFieldName() throws Exception
{
ObjectMapper objectMapper = new DefaultObjectMapper();
new DDSketchModule().getJacksonModules().forEach(objectMapper::registerModule);
DDSketchAggregatorFactory factory = new DDSketchAggregatorFactory("name", "fieldName", 0.01, 1000);
AggregatorFactory other = objectMapper.readValue(
objectMapper.writeValueAsString(factory),
AggregatorFactory.class
);
Assert.assertEquals(factory, other);
}
@Test
public void buildingSketchesAtIngestionTime() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("doubles_build_data.tsv").getFile()),
String.join(
"\n",
"{",
" \"type\": \"string\",",
" \"parseSpec\": {",
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"product\"],",
" \"dimensionExclusions\": [ \"sequenceNumber\"],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }",
"}"
),
"[{\"type\": \"ddSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"relativeError\": 0.01}]",
0, // minTimestamp
Granularities.NONE,
10, // maxRowCount
String.join(
"\n",
"{",
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"ddSketch\", \"name\": \"merged_sketch\", \"fieldName\": \"sketch\", "
+ "\"relativeError\": "
+ "0.01, \"numBins\": 10000}",
" ],",
" \"postAggregations\": [",
" {\"type\": \"quantilesFromDDSketch\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], "
+ "\"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"merged_sketch\"}}",
" ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}"
)
);
List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
ResultRow row = results.get(0);
// post agg
Object quantilesObject = row.get(1); // "quantiles"
Assert.assertTrue(quantilesObject instanceof double[]);
double[] quantiles = (double[]) quantilesObject;
Assert.assertEquals(0.001, quantiles[0], 0.0006); // min value
Assert.assertEquals(NullHandling.replaceWithDefault() ? 0.47 : 0.5, quantiles[1], 0.05); // median value
Assert.assertEquals(1, quantiles[2], 0.05); // max value
}
@Test
public void buildingSketchesAtQueryTime() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("doubles_build_data.tsv").getFile()),
String.join(
"\n",
"{",
" \"type\": \"string\",",
" \"parseSpec\": {",
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"sequenceNumber\", \"product\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" }",
"}"
),
"[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]",
0, // minTimestamp
Granularities.NONE,
10, // maxRowCount
String.join(
"\n",
"{",
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"ddSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"relativeError\": 0.005, \"numBins\": 2000}",
" ],",
" \"postAggregations\": [",
" {\"type\": \"quantilesFromDDSketch\", \"name\": \"quantiles\", \"fractions\": [0.99, 0.995, 0.999, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}"
)
);
List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
ResultRow row = results.get(0);
// post agg
Object quantilesObject = row.get(1); // "quantiles"
Assert.assertTrue(quantilesObject instanceof double[]);
double[] quantiles = (double[]) quantilesObject;
// All these tests test that the quantiles are within 1% of the exact quantile value
Assert.assertEquals(0.9838, quantiles[0], 0.9838 * 0.01); // p99
Assert.assertEquals(0.9860, quantiles[1], 0.9850 * 0.01); // p99.5
Assert.assertEquals(0.9927, quantiles[2], 0.9927 * 0.01); // p999
Assert.assertEquals(0.9952, quantiles[3], 0.9952 * 0.01); // max value
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.junit.Assert;
import org.junit.Test;
public class DDSketchToQuantilePostAggregatorTest
{
@Test
public void testSerde() throws Exception
{
DDSketchToQuantilePostAggregator there =
new DDSketchToQuantilePostAggregator("post", new ConstantPostAggregator("", 100), 0.5);
DefaultObjectMapper mapper = new DefaultObjectMapper();
DDSketchToQuantilePostAggregator andBackAgain = mapper.readValue(
mapper.writeValueAsString(there),
DDSketchToQuantilePostAggregator.class
);
Assert.assertEquals(there, andBackAgain);
Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey());
Assert.assertEquals(there.getDependentFields(), andBackAgain.getDependentFields());
}
@Test
public void testToString()
{
PostAggregator postAgg =
new DDSketchToQuantilePostAggregator("post", new ConstantPostAggregator("", 100), 0.5);
Assert.assertEquals(
"DDSketchToQuantilePostAggregator{name='post', field=ConstantPostAggregator{name='', constantValue=100}, fraction=0.5}",
postAgg.toString()
);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(DDSketchToQuantilePostAggregator.class)
.withNonnullFields("name", "field", "fraction")
.usingGetClass()
.verify();
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.junit.Assert;
import org.junit.Test;
public class DDSketchToQuantilesPostAggregatorTest
{
@Test
public void testSerde() throws Exception
{
DDSketchToQuantilesPostAggregator there =
new DDSketchToQuantilesPostAggregator("post", new ConstantPostAggregator("", 100), new double[]{0.25, 0.75});
DefaultObjectMapper mapper = new DefaultObjectMapper();
DDSketchToQuantilesPostAggregator andBackAgain = mapper.readValue(
mapper.writeValueAsString(there),
DDSketchToQuantilesPostAggregator.class
);
Assert.assertEquals(there, andBackAgain);
Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey());
Assert.assertEquals(there.getDependentFields(), andBackAgain.getDependentFields());
}
@Test
public void testToString()
{
PostAggregator postAgg = new DDSketchToQuantilesPostAggregator(
"post",
new ConstantPostAggregator("", 100),
new double[]{0.25, 0.75}
);
Assert.assertEquals(
"DDSketchToQuantilesPostAggregator{name='post', field=ConstantPostAggregator{name='', constantValue=100}, fractions=[0.25, 0.75]}",
postAgg.toString()
);
}
@Test
public void testComparator()
{
PostAggregator postAgg = new DDSketchToQuantilesPostAggregator(
"post",
new ConstantPostAggregator("", 100),
new double[]{0.25, 0.75}
);
Assert.assertThrows(
"Comparing arrays of quantiles is not supported",
IAE.class,
() -> postAgg.getComparator());
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(DDSketchToQuantilesPostAggregator.class)
.withNonnullFields("name", "field", "fractions")
.usingGetClass()
.verify();
}
}

View File

@ -0,0 +1,400 @@
2016010101 0 3 0.4806453614322793
2016010101 1 3 0.6419678871373409
2016010101 2 3
2016010101 3 3 0.7020558734824461
2016010101 4 3 0.22613138189319204
2016010101 5 3 0.06909583208106174
2016010101 6 3 0.5376346416013744
2016010101 7 3
2016010101 8 3 0.8149467521679741
2016010101 9 3 0.7251559245004248
2016010101 10 3 0.49855832099546715
2016010101 11 3 0.5798820881203658
2016010101 12 3 0.5677866915980911
2016010101 13 3
2016010101 14 3 0.8516993764638755
2016010101 15 3
2016010101 16 3 0.8109196906946308
2016010101 17 3 0.7791330245692353
2016010101 18 3 0.5499336255807227
2016010101 19 3 0.23268682623354076
2016010101 20 4
2016010101 21 4 0.6399367705457312
2016010101 22 4 0.6484832626361673
2016010101 23 4 0.1662318456280305
2016010101 24 4 0.19354936279837376
2016010101 25 4 0.5802200573751369
2016010101 26 4 0.4269381252768615
2016010101 27 4 0.3070593197796049
2016010101 28 4 0.35382552338561346
2016010101 29 4 0.7119445442397593
2016010101 30 4 0.7869733865139936
2016010101 31 4 0.0805272768355706
2016010101 32 4 0.03922392755570692
2016010101 33 4 0.5840598980488145
2016010101 34 4 0.20511232154618975
2016010101 35 4
2016010101 36 4 0.07710156514815447
2016010101 37 4 0.23290085820099904
2016010101 38 4
2016010101 39 4 0.5733869947288424
2016010101 40 1 0.9858872798659482
2016010101 41 1
2016010101 42 1 0.14839505115144447
2016010101 43 1 0.09840489123150087
2016010101 44 1 0.5479691071569414
2016010101 45 1 0.0023559980599913155
2016010101 46 1 0.7896086868719593
2016010101 47 1 0.0319014726614526
2016010101 48 1 0.842015759464531
2016010101 49 1 0.6604873440176756
2016010101 50 1
2016010101 51 1 0.6615216129493895
2016010101 52 1 0.8367143245438604
2016010101 53 1 0.9758944303783366
2016010101 54 1 0.5709096686275406
2016010101 55 1 0.21653637585091645
2016010101 56 1 0.3261165064735497
2016010101 57 1 0.5334133677491381
2016010101 58 1 0.8620204683908819
2016010101 59 1 0.5484128559617791
2016010101 60 4 0.12033006888377096
2016010101 61 4
2016010101 62 4
2016010101 63 4
2016010101 64 4 0.9858000457292002
2016010101 65 4 0.19210448139852032
2016010101 66 4 0.3974718376343238
2016010101 67 4 0.9732376558043744
2016010101 68 4 0.11356123536860396
2016010101 69 4
2016010101 70 4 0.8076862670275398
2016010101 71 4 0.09236347669493417
2016010101 72 4
2016010101 73 4 0.854189677115464
2016010101 74 4 0.7396706282809763
2016010101 75 4 0.31708622509985873
2016010101 76 4 0.43648603480196757
2016010101 77 4 0.589213905769546
2016010101 78 4 0.7415426026749161
2016010101 79 4 0.7358282894757189
2016010101 80 9
2016010101 81 9 0.4255911372929798
2016010101 82 9 0.9331910249355163
2016010101 83 9 0.06445900439995578
2016010101 84 9 0.8115413172888221
2016010101 85 9
2016010101 86 9 0.0963857458400349
2016010101 87 9 0.06153408750057188
2016010101 88 9 0.44027926988833244
2016010101 89 9
2016010101 90 9 0.5233141932162747
2016010101 91 9 0.32621493954167546
2016010101 92 9 0.34647299592637026
2016010101 93 9 0.15192824813669525
2016010101 94 9 0.644889890933122
2016010101 95 9
2016010101 96 9 0.3015940264437008
2016010101 97 9 0.435933310145303
2016010101 98 9 0.7965720726264395
2016010101 99 9 0.6948764513522069
2016010101 100 8
2016010101 101 8 0.9541985425546318
2016010101 102 8 0.4078660397769671
2016010101 103 8 0.3761817754153792
2016010101 104 8 0.4630916538268274
2016010101 105 8 0.17184297531868054
2016010101 106 8 0.616480413683724
2016010101 107 8 0.9958850843107127
2016010101 108 8 0.29264803594577704
2016010101 109 8 0.9282101022070045
2016010101 110 8 0.12103623749465953
2016010101 111 8 0.5756846725738404
2016010101 112 8 0.4986956805169892
2016010101 113 8 0.16368254315504893
2016010101 114 8 0.8246939105217244
2016010101 115 8
2016010101 116 8 0.2802464651921067
2016010101 117 8 0.14533675338382146
2016010101 118 8 0.1550635453509872
2016010101 119 8 0.9764250057102191
2016010101 120 3 0.5639798746175808
2016010101 121 3 0.5252357184891421
2016010101 122 3 0.4224461843890118
2016010101 123 3 0.8623764079415396
2016010101 124 3 0.23821471344004463
2016010101 125 3 0.6562988643211294
2016010101 126 3 0.6045632944796968
2016010101 127 3 0.9216081547045152
2016010101 128 3 0.9111787373931876
2016010101 129 3
2016010101 130 3 0.22309670266127934
2016010101 131 3 0.5610286454514603
2016010101 132 3 0.6449829420830484
2016010101 133 3 0.47359871694806055
2016010101 134 3 0.4456546777062259
2016010101 135 3 0.3233370634627728
2016010101 136 3 0.535941427413942
2016010101 137 3 0.1465746735321063
2016010101 138 3 0.1619540222600243
2016010101 139 3
2016010101 140 1 0.017761763118174123
2016010101 141 1 0.19652899466185436
2016010101 142 1 0.8918577110251682
2016010101 143 1 0.5483769339947813
2016010101 144 1 0.35583586810262346
2016010101 145 1 0.7872104182932219
2016010101 146 1 0.4708873523759258
2016010101 147 1
2016010101 148 1 0.5782684331898654
2016010101 149 1 0.9493458125552832
2016010101 150 1 0.34871248062641946
2016010101 151 1 0.12964044092772886
2016010101 152 1 0.7565381339014415
2016010101 153 1 0.722119729581673
2016010101 154 1 0.35411310281363473
2016010101 155 1 0.4485837785057891
2016010101 156 1
2016010101 157 1 0.29365186469373317
2016010101 158 1 0.28026386528276104
2016010101 159 1 0.04245162399196889
2016010101 160 3 0.17387064034440958
2016010101 161 3 0.08578972205632507
2016010101 162 3 0.14307939514143686
2016010101 163 3
2016010101 164 3 0.05790520846514535
2016010101 165 3 0.5340068761562542
2016010101 166 3 0.700106038458213
2016010101 167 3 0.38998802776748753
2016010101 168 3 0.5050112412495604
2016010101 169 3 0.4923503731785702
2016010101 170 3 0.09763476584855624
2016010101 171 3 0.9181948066342877
2016010101 172 3
2016010101 173 3 0.845482260534406
2016010101 174 3 0.43828851854546647
2016010101 175 3 0.753761527760726
2016010101 176 3 0.04212838877462455
2016010101 177 3 0.691468086019305
2016010101 178 3 0.5740697793884527
2016010101 179 3
2016010101 180 2
2016010101 181 2
2016010101 182 2 0.6391276620023679
2016010101 183 2 0.18995634100597447
2016010101 184 2
2016010101 185 2 0.3601348485475453
2016010101 186 2 0.5898107379081887
2016010101 187 2 0.7654577155215041
2016010101 188 2 0.9861893898445978
2016010101 189 2
2016010101 190 2
2016010101 191 2
2016010101 192 2 0.2502642896132842
2016010101 193 2 0.007316463522836103
2016010101 194 2 0.7995874341737429
2016010101 195 2 0.8767428241522481
2016010101 196 2 0.20911811774820832
2016010101 197 2 0.6944454810391126
2016010101 198 2 0.6138639733419406
2016010101 199 2 0.5340384213931233
2016010101 200 4 0.610341783366869
2016010101 201 4 0.8095356348162531
2016010101 202 4 0.14576711100717
2016010101 203 4 0.324791997579967
2016010101 204 4 0.7012027438404959
2016010101 205 4 0.6855125265148104
2016010101 206 4 0.725721465888873
2016010101 207 4 0.37334253570089415
2016010101 208 4 0.7033238012522983
2016010101 209 4 0.6289935861560849
2016010101 210 4 0.22100961408197517
2016010101 211 4 0.5361822265452533
2016010101 212 4 0.23524963349934325
2016010101 213 4
2016010101 214 4 0.2151612160248132
2016010101 215 4 0.27034057325897454
2016010101 216 4 0.45788942603194727
2016010101 217 4 0.1900006529735202
2016010101 218 4 0.21761539728764212
2016010101 219 4
2016010101 220 9 0.11191094372411481
2016010101 221 9 0.7257156210111483
2016010101 222 9
2016010101 223 9 0.09767407184252375
2016010101 224 9
2016010101 225 9 0.8016522677725126
2016010101 226 9 0.8944075176139713
2016010101 227 9 0.7071810476904448
2016010101 228 9 0.7425380900058187
2016010101 229 9
2016010101 230 9 0.1031211607034147
2016010101 231 9 0.38694779402631885
2016010101 232 9 0.6121565493162887
2016010101 233 9 0.08826787524008717
2016010101 234 9 0.34982652581050666
2016010101 235 9 0.294468865237702
2016010101 236 9
2016010101 237 9 0.5190906777357499
2016010101 238 9 0.2424354751098784
2016010101 239 9 0.7584304131139413
2016010101 240 7 0.35704199266816017
2016010101 241 7 0.6213205251006355
2016010101 242 7 0.6794778377157997
2016010101 243 7 0.9660152207885527
2016010101 244 7 0.746230867578865
2016010101 245 7
2016010101 246 7 0.6637336893540101
2016010101 247 7 0.527025000973831
2016010101 248 7
2016010101 249 7 0.3689478346414077
2016010101 250 7 0.1046606291981873
2016010101 251 7 0.42368572552625094
2016010101 252 7 0.10870686807188557
2016010101 253 7 0.06569693633418128
2016010101 254 7 0.29873141724229657
2016010101 255 7 0.29158560982689863
2016010101 256 7 0.7678017218931323
2016010101 257 7 0.8900303350507414
2016010101 258 7 0.4419580092209411
2016010101 259 7 0.6381120775261563
2016010101 260 3
2016010101 261 3
2016010101 262 3 0.4227980856443392
2016010101 263 3
2016010101 264 3 0.8755401132173695
2016010101 265 3 0.5275377089199973
2016010101 266 3 0.12424387758622746
2016010101 267 3 0.01547071944810885
2016010101 268 3 0.37451206779305857
2016010101 269 3 0.1989423043276275
2016010101 270 3 0.5949853939670747
2016010101 271 3
2016010101 272 3
2016010101 273 3 0.6788434157726136
2016010101 274 3 0.4138070035489033
2016010101 275 3 0.3262153201368553
2016010101 276 3
2016010101 277 3 0.43177816031851957
2016010101 278 3 0.551450932204876
2016010101 279 3 0.7282741792330263
2016010101 280 3 0.9122069202680759
2016010101 281 3 0.7413285089462801
2016010101 282 3 0.03663726371403986
2016010101 283 3 0.23947998113921076
2016010101 284 3 0.9362838173143953
2016010101 285 3 0.28291781154121487
2016010101 286 3 0.9083170701852669
2016010101 287 3 0.6955809083495521
2016010101 288 3 0.9102559703489196
2016010101 289 3 0.5856005115807994
2016010101 290 3
2016010101 291 3 0.7547680618605328
2016010101 292 3 0.3635413762669889
2016010101 293 3 0.5060093676499698
2016010101 294 3 0.748493032129933
2016010101 295 3 0.36173218418100006
2016010101 296 3 0.8017631866114252
2016010101 297 3 0.09251208639535535
2016010101 298 3 0.3956843833130532
2016010101 299 3 0.8965031193765175
2016010101 300 8 0.06138169953397199
2016010101 301 8 0.22791862853999423
2016010101 302 8 0.4246825688431949
2016010101 303 8 0.7695915902917281
2016010101 304 8
2016010101 305 8
2016010101 306 8 0.15549809858942576
2016010101 307 8 0.3236736994444922
2016010101 308 8
2016010101 309 8 0.44112090310236873
2016010101 310 8 0.28658459361862487
2016010101 311 8 0.9346348774247973
2016010101 312 8
2016010101 313 8 0.32921840037119676
2016010101 314 8
2016010101 315 8 0.5619515224721092
2016010101 316 8 0.5436607404043168
2016010101 317 8 0.13126793260709302
2016010101 318 8
2016010101 319 8 0.08486286173372692
2016010101 320 2 0.9380754465335691
2016010101 321 2 0.8698491012104429
2016010101 322 2 0.2141986220865666
2016010101 323 2 0.8191428099424286
2016010101 324 2 0.5374397266436216
2016010101 325 2 0.8014642292436202
2016010101 326 2 0.2454633759035828
2016010101 327 2 0.2659186693999648
2016010101 328 2 0.12215527116473579
2016010101 329 2 0.23512672887844477
2016010101 330 2 0.17317858307470202
2016010101 331 2 0.014761951009997776
2016010101 332 2
2016010101 333 2
2016010101 334 2
2016010101 335 2 0.4839228057946262
2016010101 336 2 0.13113562836707116
2016010101 337 2 0.5776063788487777
2016010101 338 2 0.18353702932146465
2016010101 339 2 0.9550943323447759
2016010101 340 3 0.010294130457498829
2016010101 341 3
2016010101 342 3
2016010101 343 3 0.043939221631064784
2016010101 344 3 0.468615649016912
2016010101 345 3 0.8182318625708176
2016010101 346 3
2016010101 347 3
2016010101 348 3 0.13438311444894857
2016010101 349 3 0.9612868278105434
2016010101 350 3 0.8957083777498813
2016010101 351 3 0.49303068183606236
2016010101 352 3 0.3907574108316315
2016010101 353 3 0.7609044660129155
2016010101 354 3 0.0015306502862820759
2016010101 355 3
2016010101 356 3 0.0777103319482042
2016010101 357 3 0.040512845904230654
2016010101 358 3
2016010101 359 3 0.8615439676963182
2016010101 360 0 0.541631191849709
2016010101 361 0 0.38839056303777064
2016010101 362 0
2016010101 363 0
2016010101 364 0 0.25282693081575114
2016010101 365 0
2016010101 366 0 0.8088940492058253
2016010101 367 0 0.23287052835067323
2016010101 368 0 0.2388792692348808
2016010101 369 0
2016010101 370 0 0.047812293417679674
2016010101 371 0 0.5904656835670964
2016010101 372 0
2016010101 373 0 0.21010216933405235
2016010101 374 0 0.6128169315116692
2016010101 375 0 0.0021979088847082773
2016010101 376 0
2016010101 377 0 0.029233195772592535
2016010101 378 0
2016010101 379 0 0.13492455955229932
2016010101 380 7 0.45162411597798047
2016010101 381 7 0.6017062629482749
2016010101 382 7
2016010101 383 7 0.6347243397708097
2016010101 384 7
2016010101 385 7 0.3679411384173339
2016010101 386 7 0.11111298782358625
2016010101 387 7 0.848348012358186
2016010101 388 7 0.20181516171015812
2016010101 389 7
2016010101 390 7
2016010101 391 7 0.03477179524923002
2016010101 392 7
2016010101 393 7
2016010101 394 7
2016010101 395 7 0.4974934049704761
2016010101 396 7 0.6947690881973858
2016010101 397 7 0.21185369837139645
2016010101 398 7
2016010101 399 7 0.6859065700191138
Can't render this file because it has a wrong number of fields in line 3.

View File

@ -222,6 +222,7 @@
<module>extensions-contrib/momentsketch</module>
<module>extensions-contrib/moving-average-query</module>
<module>extensions-contrib/tdigestsketch</module>
<module>extensions-contrib/ddsketch</module>
<module>extensions-contrib/influxdb-emitter</module>
<module>extensions-contrib/gce-extensions</module>
<module>extensions-contrib/aliyun-oss-extensions</module>

View File

@ -159,6 +159,9 @@ public class AggregatorUtil
public static final byte ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID = 0x4D;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID = 0x4E;
// DDSketch aggregator
public static final byte DDSKETCH_CACHE_TYPE_ID = 0x50;
/**
* Given a list of PostAggregators and the name of an output column, returns the minimal list of PostAggregators
* required to compute the output column.

View File

@ -68,4 +68,6 @@ public class PostAggregatorIds
public static final byte KLL_FLOATS_SKETCH_TO_STRING_CACHE_TYPE_ID = 44;
public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILE_CACHE_TYPE_ID = 45;
public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILES_CACHE_TYPE_ID = 46;
public static final byte DDSKETCH_QUANTILES_TYPE_ID = 51;
public static final byte DDSKETCH_QUANTILE_TYPE_ID = 52;
}

View File

@ -2383,3 +2383,10 @@ spectatorHistogramTimer
spectatorHistogramDistribution
percentileSpectatorHistogram
percentilesSpectatorHistogram
ddSketch
DDSketch
collapsingLowestDense
relativeError
numBins
quantilesFromDDSketch
quantileFromDDSketch