diff --git a/docs/content/development/extensions-core/datasketches-quantiles.md b/docs/content/development/extensions-core/datasketches-quantiles.md index 2282de22ac8..62b2e1fd926 100644 --- a/docs/content/development/extensions-core/datasketches-quantiles.md +++ b/docs/content/development/extensions-core/datasketches-quantiles.md @@ -99,6 +99,31 @@ This returns an approximation to the histogram given an array of split points th } ``` +#### Rank + +This returns an approximation to the rank of a given value that is the fraction of the distribution less than that value. + +```json +{ + "type" : "quantilesDoublesSketchToRank", + "name": , + "field" : , + "value" : +} +``` +#### CDF + +This returns an approximation to the Cumulative Distribution Function given an array of split points that define the edges of the bins. An array of m unique, monotonically increasing split points divide the real number line into m+1 consecutive disjoint intervals. The definition of an interval is inclusive of the left split point and exclusive of the right split point. The resulting array of fractions can be viewed as ranks of each split point with one additional rank that is always 1. + +```json +{ + "type" : "quantilesDoublesSketchToCDF", + "name": , + "field" : , + "splitPoints" : +} +``` + #### Sketch Summary This returns a summary of the sketch that can be used for debugging. This is the result of calling toString() method. diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchModule.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchModule.java index 33f6949a584..43762ae28af 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchModule.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchModule.java @@ -42,6 +42,8 @@ public class DoublesSketchModule implements DruidModule public static final String DOUBLES_SKETCH_HISTOGRAM_POST_AGG = "quantilesDoublesSketchToHistogram"; public static final String DOUBLES_SKETCH_QUANTILE_POST_AGG = "quantilesDoublesSketchToQuantile"; public static final String DOUBLES_SKETCH_QUANTILES_POST_AGG = "quantilesDoublesSketchToQuantiles"; + public static final String DOUBLES_SKETCH_RANK_POST_AGG = "quantilesDoublesSketchToRank"; + public static final String DOUBLES_SKETCH_CDF_POST_AGG = "quantilesDoublesSketchToCDF"; public static final String DOUBLES_SKETCH_TO_STRING_POST_AGG = "quantilesDoublesSketchToString"; @Override @@ -62,6 +64,8 @@ public class DoublesSketchModule implements DruidModule new NamedType(DoublesSketchToHistogramPostAggregator.class, DOUBLES_SKETCH_HISTOGRAM_POST_AGG), new NamedType(DoublesSketchToQuantilePostAggregator.class, DOUBLES_SKETCH_QUANTILE_POST_AGG), new NamedType(DoublesSketchToQuantilesPostAggregator.class, DOUBLES_SKETCH_QUANTILES_POST_AGG), + new NamedType(DoublesSketchToRankPostAggregator.class, DOUBLES_SKETCH_RANK_POST_AGG), + new NamedType(DoublesSketchToCDFPostAggregator.class, DOUBLES_SKETCH_CDF_POST_AGG), new NamedType(DoublesSketchToStringPostAggregator.class, DOUBLES_SKETCH_TO_STRING_POST_AGG) ).addSerializer(DoublesSketch.class, new DoublesSketchJsonSerializer()) ); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregator.java new file mode 100644 index 00000000000..272c227bba8 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregator.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.yahoo.sketches.quantiles.DoublesSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class DoublesSketchToCDFPostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + private final double[] splitPoints; + + @JsonCreator + public DoublesSketchToCDFPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("splitPoints") final double[] splitPoints) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.splitPoints = Preconditions.checkNotNull(splitPoints, "array of split points is null"); + } + + @Override + public Object compute(final Map combinedAggregators) + { + final DoublesSketch sketch = (DoublesSketch) field.compute(combinedAggregators); + if (sketch.isEmpty()) { + final double[] cdf = new double[splitPoints.length + 1]; + Arrays.fill(cdf, Double.NaN); + return cdf; + } + return sketch.getCDF(splitPoints); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double[] getSplitPoints() + { + return splitPoints; + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing histograms is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + ", splitPoints=" + Arrays.toString(splitPoints) + + "}"; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DoublesSketchToCDFPostAggregator that = (DoublesSketchToCDFPostAggregator) o; + if (!name.equals(that.name)) { + return false; + } + if (!Arrays.equals(splitPoints, that.splitPoints)) { + return false; + } + return field.equals(that.field); + } + + @Override + public int hashCode() + { + int hashCode = name.hashCode() * 31 + field.hashCode(); + hashCode = hashCode * 31 + Arrays.hashCode(splitPoints); + return hashCode; + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + PostAggregatorIds.QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID).appendCacheable(field); + for (final double value : splitPoints) { + builder.appendDouble(value); + } + return builder.build(); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } + +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregator.java new file mode 100644 index 00000000000..3343d0c8cc0 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Doubles; +import com.yahoo.sketches.quantiles.DoublesSketch; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class DoublesSketchToRankPostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + private final double value; + + @JsonCreator + public DoublesSketchToRankPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("value") final double value) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.value = value; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double getValue() + { + return value; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final DoublesSketch sketch = (DoublesSketch) field.compute(combinedAggregators); + return sketch.getRank(value); + } + + @Override + public Comparator getComparator() + { + return new Comparator() + { + @Override + public int compare(final Double a, final Double b) + { + return Doubles.compare(a, b); + } + }; + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + ", value=" + value + + "}"; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DoublesSketchToRankPostAggregator that = (DoublesSketchToRankPostAggregator) o; + if (!name.equals(that.name)) { + return false; + } + if (value != that.value) { + return false; + } + return field.equals(that.field); + } + + @Override + public int hashCode() + { + return (name.hashCode() * 31 + field.hashCode()) * 31 + Double.hashCode(value); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(PostAggregatorIds.QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID) + .appendCacheable(field).appendDouble(value).build(); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } + +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregatorTest.java new file mode 100644 index 00000000000..f41f3167083 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToCDFPostAggregatorTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class DoublesSketchToCDFPostAggregatorTest +{ + @Test + public void emptySketch() + { + final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(null); + final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8); + + final Map fields = new HashMap<>(); + fields.put("sketch", agg.get()); + + final PostAggregator postAgg = new DoublesSketchToCDFPostAggregator( + "cdf", + new FieldAccessPostAggregator("field", "sketch"), + new double[] {4} + ); + + final double[] histogram = (double[]) postAgg.compute(fields); + Assert.assertNotNull(histogram); + Assert.assertEquals(2, histogram.length); + Assert.assertTrue(Double.isNaN(histogram[0])); + Assert.assertTrue(Double.isNaN(histogram[1])); + } + + @Test + public void normalCase() + { + final double[] values = new double[] {1, 2, 3, 4, 5, 6}; + final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(values); + + final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < values.length; i++) { + agg.aggregate(); + selector.increment(); + } + + final Map fields = new HashMap<>(); + fields.put("sketch", agg.get()); + + final PostAggregator postAgg = new DoublesSketchToCDFPostAggregator( + "cdf", + new FieldAccessPostAggregator("field", "sketch"), + new double[] {4} // half of the distribution is below 4 + ); + + final double[] cdf = (double[]) postAgg.compute(fields); + Assert.assertNotNull(cdf); + Assert.assertEquals(2, cdf.length); + Assert.assertEquals(0.5, cdf[0], 0); + Assert.assertEquals(1.0, cdf[1], 0); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregatorTest.java new file mode 100644 index 00000000000..f9ffccc1602 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchToRankPostAggregatorTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class DoublesSketchToRankPostAggregatorTest +{ + @Test + public void emptySketch() + { + final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(null); + final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8); + + final Map fields = new HashMap<>(); + fields.put("sketch", agg.get()); + + final PostAggregator postAgg = new DoublesSketchToRankPostAggregator( + "rank", + new FieldAccessPostAggregator("field", "sketch"), + 0 + ); + + final double rank = (double) postAgg.compute(fields); + Assert.assertTrue(Double.isNaN(rank)); + } + + @Test + public void normalCase() + { + final double[] values = new double[] {1, 2, 3, 4, 5, 6}; + final TestDoubleColumnSelectorImpl selector = new TestDoubleColumnSelectorImpl(values); + + final Aggregator agg = new DoublesSketchBuildAggregator(selector, 8); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < values.length; i++) { + agg.aggregate(); + selector.increment(); + } + + final Map fields = new HashMap<>(); + fields.put("sketch", agg.get()); + + final PostAggregator postAgg = new DoublesSketchToRankPostAggregator( + "rank", + new FieldAccessPostAggregator("field", "sketch"), + 4 + ); + + final double rank = (double) postAgg.compute(fields); + Assert.assertEquals(0.5, rank, 0); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java index ea9fe883b03..537a180c1ac 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java @@ -48,4 +48,8 @@ public class PostAggregatorIds public static final byte MOMENTS_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 24; public static final byte MOMENTS_SKETCH_TO_MIN_CACHE_TYPE_ID = 25; public static final byte MOMENTS_SKETCH_TO_MAX_CACHE_TYPE_ID = 26; + + // Datasketches Quantiles sketch aggregator (part 2) + public static final byte QUANTILES_DOUBLES_SKETCH_TO_RANK_CACHE_TYPE_ID = 27; + public static final byte QUANTILES_DOUBLES_SKETCH_TO_CDF_CACHE_TYPE_ID = 28; }