Datasketches quantiles more post-aggs (#7550)

* rank and CDF post-aggs

* added post-aggs to the module

* added new post-aggs

* moved post-agg IDs

* moved post-agg IDs
This commit is contained in:
Alexander Saydakov 2019-05-10 11:46:54 -07:00 committed by Himanshu
parent 1d49364d08
commit ca1a6649f6
7 changed files with 494 additions and 0 deletions

View File

@ -99,6 +99,31 @@ This returns an approximation to the histogram given an array of split points th
}
```
#### Rank
This returns an approximation to the rank of a given value that is the fraction of the distribution less than that value.
```json
{
"type" : "quantilesDoublesSketchToRank",
"name": <output name>,
"field" : <post aggregator that refers to a DoublesSketch (fieldAccess or another post aggregator)>,
"value" : <value>
}
```
#### CDF
This returns an approximation to the Cumulative Distribution Function given an array of split points that define the edges of the bins. An array of <i>m</i> unique, monotonically increasing split points divide the real number line into <i>m+1</i> consecutive disjoint intervals. The definition of an interval is inclusive of the left split point and exclusive of the right split point. The resulting array of fractions can be viewed as ranks of each split point with one additional rank that is always 1.
```json
{
"type" : "quantilesDoublesSketchToCDF",
"name": <output name>,
"field" : <post aggregator that refers to a DoublesSketch (fieldAccess or another post aggregator)>,
"splitPoints" : <array of split points>
}
```
#### Sketch Summary
This returns a summary of the sketch that can be used for debugging. This is the result of calling toString() method.

View File

@ -42,6 +42,8 @@ public class DoublesSketchModule implements DruidModule
public static final String DOUBLES_SKETCH_HISTOGRAM_POST_AGG = "quantilesDoublesSketchToHistogram";
public static final String DOUBLES_SKETCH_QUANTILE_POST_AGG = "quantilesDoublesSketchToQuantile";
public static final String DOUBLES_SKETCH_QUANTILES_POST_AGG = "quantilesDoublesSketchToQuantiles";
public static final String DOUBLES_SKETCH_RANK_POST_AGG = "quantilesDoublesSketchToRank";
public static final String DOUBLES_SKETCH_CDF_POST_AGG = "quantilesDoublesSketchToCDF";
public static final String DOUBLES_SKETCH_TO_STRING_POST_AGG = "quantilesDoublesSketchToString";
@Override
@ -62,6 +64,8 @@ public class DoublesSketchModule implements DruidModule
new NamedType(DoublesSketchToHistogramPostAggregator.class, DOUBLES_SKETCH_HISTOGRAM_POST_AGG),
new NamedType(DoublesSketchToQuantilePostAggregator.class, DOUBLES_SKETCH_QUANTILE_POST_AGG),
new NamedType(DoublesSketchToQuantilesPostAggregator.class, DOUBLES_SKETCH_QUANTILES_POST_AGG),
new NamedType(DoublesSketchToRankPostAggregator.class, DOUBLES_SKETCH_RANK_POST_AGG),
new NamedType(DoublesSketchToCDFPostAggregator.class, DOUBLES_SKETCH_CDF_POST_AGG),
new NamedType(DoublesSketchToStringPostAggregator.class, DOUBLES_SKETCH_TO_STRING_POST_AGG)
).addSerializer(DoublesSketch.class, new DoublesSketchJsonSerializer())
);

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.yahoo.sketches.quantiles.DoublesSketch;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class DoublesSketchToCDFPostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
private final double[] splitPoints;
@JsonCreator
public DoublesSketchToCDFPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("splitPoints") final double[] splitPoints)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
this.splitPoints = Preconditions.checkNotNull(splitPoints, "array of split points is null");
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final DoublesSketch sketch = (DoublesSketch) field.compute(combinedAggregators);
if (sketch.isEmpty()) {
final double[] cdf = new double[splitPoints.length + 1];
Arrays.fill(cdf, Double.NaN);
return cdf;
}
return sketch.getCDF(splitPoints);
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
public double[] getSplitPoints()
{
return splitPoints;
}
@Override
public Comparator<double[]> getComparator()
{
throw new IAE("Comparing histograms is not supported");
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
", splitPoints=" + Arrays.toString(splitPoints) +
"}";
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DoublesSketchToCDFPostAggregator that = (DoublesSketchToCDFPostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
if (!Arrays.equals(splitPoints, that.splitPoints)) {
return false;
}
return field.equals(that.field);
}
@Override
public int hashCode()
{
int hashCode = name.hashCode() * 31 + field.hashCode();
hashCode = hashCode * 31 + Arrays.hashCode(splitPoints);
return hashCode;
}
@Override
public byte[] getCacheKey()
{
final CacheKeyBuilder builder = new CacheKeyBuilder(
PostAggregatorIds.QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID).appendCacheable(field);
for (final double value : splitPoints) {
builder.appendDouble(value);
}
return builder.build();
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> map)
{
return this;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.yahoo.sketches.quantiles.DoublesSketch;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class DoublesSketchToRankPostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
private final double value;
@JsonCreator
public DoublesSketchToRankPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("value") final double value)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
this.value = value;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
public double getValue()
{
return value;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final DoublesSketch sketch = (DoublesSketch) field.compute(combinedAggregators);
return sketch.getRank(value);
}
@Override
public Comparator<Double> getComparator()
{
return new Comparator<Double>()
{
@Override
public int compare(final Double a, final Double b)
{
return Doubles.compare(a, b);
}
};
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
", value=" + value +
"}";
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DoublesSketchToRankPostAggregator that = (DoublesSketchToRankPostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
if (value != that.value) {
return false;
}
return field.equals(that.field);
}
@Override
public int hashCode()
{
return (name.hashCode() * 31 + field.hashCode()) * 31 + Double.hashCode(value);
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(PostAggregatorIds.QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID)
.appendCacheable(field).appendDouble(value).build();
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> map)
{
return this;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class DoublesSketchToCDFPostAggregatorTest
{
@Test
public void emptySketch()
{
final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(null);
final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8);
final Map<String, Object> fields = new HashMap<>();
fields.put("sketch", agg.get());
final PostAggregator postAgg = new DoublesSketchToCDFPostAggregator(
"cdf",
new FieldAccessPostAggregator("field", "sketch"),
new double[] {4}
);
final double[] histogram = (double[]) postAgg.compute(fields);
Assert.assertNotNull(histogram);
Assert.assertEquals(2, histogram.length);
Assert.assertTrue(Double.isNaN(histogram[0]));
Assert.assertTrue(Double.isNaN(histogram[1]));
}
@Test
public void normalCase()
{
final double[] values = new double[] {1, 2, 3, 4, 5, 6};
final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(values);
final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < values.length; i++) {
agg.aggregate();
selector.increment();
}
final Map<String, Object> fields = new HashMap<>();
fields.put("sketch", agg.get());
final PostAggregator postAgg = new DoublesSketchToCDFPostAggregator(
"cdf",
new FieldAccessPostAggregator("field", "sketch"),
new double[] {4} // half of the distribution is below 4
);
final double[] cdf = (double[]) postAgg.compute(fields);
Assert.assertNotNull(cdf);
Assert.assertEquals(2, cdf.length);
Assert.assertEquals(0.5, cdf[0], 0);
Assert.assertEquals(1.0, cdf[1], 0);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class DoublesSketchToRankPostAggregatorTest
{
@Test
public void emptySketch()
{
final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(null);
final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8);
final Map<String, Object> fields = new HashMap<>();
fields.put("sketch", agg.get());
final PostAggregator postAgg = new DoublesSketchToRankPostAggregator(
"rank",
new FieldAccessPostAggregator("field", "sketch"),
0
);
final double rank = (double) postAgg.compute(fields);
Assert.assertTrue(Double.isNaN(rank));
}
@Test
public void normalCase()
{
final double[] values = new double[] {1, 2, 3, 4, 5, 6};
final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(values);
final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < values.length; i++) {
agg.aggregate();
selector.increment();
}
final Map<String, Object> fields = new HashMap<>();
fields.put("sketch", agg.get());
final PostAggregator postAgg = new DoublesSketchToRankPostAggregator(
"rank",
new FieldAccessPostAggregator("field", "sketch"),
4
);
final double rank = (double) postAgg.compute(fields);
Assert.assertEquals(0.5, rank, 0);
}
}

View File

@ -48,4 +48,8 @@ public class PostAggregatorIds
public static final byte MOMENTS_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 24;
public static final byte MOMENTS_SKETCH_TO_MIN_CACHE_TYPE_ID = 25;
public static final byte MOMENTS_SKETCH_TO_MAX_CACHE_TYPE_ID = 26;
// Datasketches Quantiles sketch aggregator (part 2)
public static final byte QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID = 27;
public static final byte QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID = 28;
}