diff --git a/docs/development/extensions-core/datasketches-tuple.md b/docs/development/extensions-core/datasketches-tuple.md index 0e540bc1396..22622f187bb 100644 --- a/docs/development/extensions-core/datasketches-tuple.md +++ b/docs/development/extensions-core/datasketches-tuple.md @@ -207,3 +207,46 @@ Returns a human-readable summary of a given ArrayOfDoublesSketch. This is a stri "field" : } ``` + + +### Constant ArrayOfDoublesSketch + +This post aggregator adds a Base64-encoded constant ArrayOfDoublesSketch value that you can use in other post aggregators. +```json +{ + "type": "arrayOfDoublesSketchConstant", + "name": DESTINATION_COLUMN_NAME, + "value": CONSTANT_SKETCH_VALUE +} +``` + +### Base64 output of ArrayOfDoublesSketch + +This post aggregator outputs an ArrayOfDoublesSketch as a Base64-encoded string storing the constant tuple sketch value that you can use in other post aggregators. + +```json +{ + "type": "arrayOfDoublesSketchToBase64String", + "name": DESTINATION_COLUMN_NAME, + "field": +} +``` + +### Estimated metrics values for each column of ArrayOfDoublesSketch + +For the key-value pairs in the given ArrayOfDoublesSketch, this post aggregator estimates the sum for each set of values across the keys. For example, the post aggregator returns `{3.0, 8.0}` for the following key-value pairs: + +``` +Key_1, {1.0, 3.0} +Key_2, {2.0, 5.0} +``` + +The post aggregator returns _N_ double values, where _N_ is the number of values associated with each key. + +```json +{ + "type": "arrayOfDoublesSketchToMetricsSumEstimate", + "name": DESTINATION_COLUMN_NAME, + "field": +} +``` diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregator.java new file mode 100755 index 00000000000..de65a3c1bf3 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * This post-aggregator converts a given Base64 encoded string to an ArrayOfDoublesSketch. + * The input column contains name of post-aggregator output and base64 encoded input string. + * The output is a deserialized {@link ArrayOfDoublesSketch} . + */ +public class ArrayOfDoublesSketchConstantPostAggregator extends ArrayOfDoublesSketchPostAggregator +{ + + private final String value; + private final ArrayOfDoublesSketch sketchValue; + + @JsonCreator + public ArrayOfDoublesSketchConstantPostAggregator(@JsonProperty("name") String name, @JsonProperty("value") String value) + { + super(name); + Preconditions.checkArgument(value != null && !value.isEmpty(), + "Constant value cannot be null or empty, expecting base64 encoded sketch string"); + this.value = value; + this.sketchValue = ArrayOfDoublesSketchOperations.deserializeFromBase64EncodedStringSafe(value); + } + + @Override + public Set getDependentFields() + { + return Collections.emptySet(); + } + + @Override + public Comparator getComparator() + { + return Comparators.alwaysEqual(); + } + + @Override + public Object compute(Map combinedAggregators) + { + return sketchValue; + } + + @Override + public ArrayOfDoublesSketchConstantPostAggregator decorate(Map aggregators) + { + return this; + } + + public ArrayOfDoublesSketch getSketchValue() + { + return sketchValue; + } + + @Override + public String toString() + { + return "ArrayOfDoublesSketchConstantPostAggregator{name='" + this.getName() + "', value='" + value + "'}"; + } + + private String getRawSketchValue() + { + return value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ArrayOfDoublesSketchConstantPostAggregator that = (ArrayOfDoublesSketchConstantPostAggregator) o; + if (!(Objects.equals(this.getName(), that.getName()) && Objects.equals(this.value, that.value) + && Objects.equals(this.getSketchValue(), that.getSketchValue()))) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), value, sketchValue); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID) + .appendString(DigestUtils.sha1Hex(value)).build(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java old mode 100644 new mode 100755 index 726925a0826..9fd421b9a4f --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java @@ -48,6 +48,13 @@ public class ArrayOfDoublesSketchModule implements DruidModule public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG); public static final ColumnType MERGE_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG); + public static final String ARRAY_OF_DOUBLES_SKETCH_CONSTANT = "arrayOfDoublesSketchConstant"; + + public static final String ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING = "arrayOfDoublesSketchToBase64String"; + + public static final String ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE = "arrayOfDoublesSketchToMetricsSumEstimate"; + + @Override public void configure(final Binder binder) { @@ -100,6 +107,18 @@ public class ArrayOfDoublesSketchModule implements DruidModule new NamedType( ArrayOfDoublesSketchToStringPostAggregator.class, "arrayOfDoublesSketchToString" + ), + new NamedType( + ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class, + ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE + ), + new NamedType( + ArrayOfDoublesSketchConstantPostAggregator.class, + ARRAY_OF_DOUBLES_SKETCH_CONSTANT + ), + new NamedType( + ArrayOfDoublesSketchToBase64StringPostAggregator.class, + ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING ) ).addSerializer(ArrayOfDoublesSketch.class, new ArrayOfDoublesSketchJsonSerializer()) ); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregator.java new file mode 100755 index 00000000000..0511f98ab61 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import java.util.Comparator; +import java.util.Map; + +/** + * Returns a base64 encoded string of a given {@link ArrayOfDoublesSketch}. + * This is a string returned by encoding the output of toByteArray() using Base64 method of the sketch. + * This can be useful for debugging and using the sketch output in other operations. + */ +public class ArrayOfDoublesSketchToBase64StringPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator +{ + + @JsonCreator + public ArrayOfDoublesSketchToBase64StringPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field + ) + { + super(name, field); + } + + @Override + public String compute(final Map combinedAggregators) + { + final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators); + return StringUtils.encodeBase64String(sketch.toByteArray()); + } + + @Override + public ColumnType getType(ColumnInspector signature) + { + return ColumnType.STRING; + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing sketch summaries is not supported"); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID) + .appendCacheable(getField()) + .build(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.java new file mode 100755 index 00000000000..03c2de780be --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketchIterator; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; + +/** + * Returns a list of estimate values of metrics column from a given {@link ArrayOfDoublesSketch}. + * The result will be N double values, where N is the number of double values kept in the sketch per key. + */ +public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator +{ + + @JsonCreator + public ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field + ) + { + super(name, field); + } + + @Override + public double[] compute(final Map combinedAggregators) + { + final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators); + final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()]; + Arrays.setAll(stats, i -> new SummaryStatistics()); + final ArrayOfDoublesSketchIterator it = sketch.iterator(); + while (it.next()) { + final double[] values = it.getValues(); + for (int i = 0; i < values.length; i++) { + stats[i].addValue(values[i]); + } + } + final double[] estimates = new double[sketch.getNumValues()]; + Arrays.setAll(estimates, i -> (stats[i].getSum()) / (sketch.getTheta())); + return estimates; + } + + @Override + public ColumnType getType(ColumnInspector signature) + { + return ColumnType.DOUBLE_ARRAY; + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of estimate values is not supported"); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID) + .appendCacheable(getField()) + .build(); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java old mode 100644 new mode 100755 index badbf0a1110..3e42a245709 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java @@ -1244,4 +1244,97 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling Assert.assertEquals(0.0, ds3.getMinValue(), 0); Assert.assertEquals(3.0, ds3.getMaxValue(), 0); } + + + //Test ConstantTupleSketchPost-Agg and Base64 Encoding + + @Test + public void testConstantAndBase64WithEstimateSumPostAgg() throws Exception + { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("tuple/array_of_doubles_sketch_data_two_values.tsv") + .getFile()), + String.join( + "\n", + "{", + " \"type\": \"string\",", + " \"parseSpec\": {", + " \"format\": \"tsv\",", + " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},", + " \"dimensionsSpec\": {", + " \"dimensions\": [\"product\"],", + " \"dimensionExclusions\": [],", + " \"spatialDimensions\": []", + " },", + " \"columns\": [\"timestamp\", \"product\", \"sketch\"]", + " }", + "}" + ), + String.join( + "\n", + "[", + " {\"type\": \"arrayOfDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"nominalEntries\": 1024, \"numberOfValues\": 2}", + "]" + ), + 0, // minTimestamp + Granularities.NONE, + 10, // maxRowCount + String.join( + "\n", + "{", + " \"queryType\": \"groupBy\",", + " \"dataSource\": \"test_datasource\",", + " \"granularity\": \"ALL\",", + " \"dimensions\": [],", + " \"aggregations\": [", + " {\"type\": \"arrayOfDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"nominalEntries\": 1024, \"numberOfValues\": 2}", + " ],", + " \"postAggregations\": [", + " {\"type\": \"arrayOfDoublesSketchToMetricsSumEstimate\", \"name\": \"estimateSum\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"arrayOfDoublesSketchToMetricsSumEstimate\", \"name\": \"intersection\", \"field\": {", + " \"type\": \"arrayOfDoublesSketchSetOp\",", + " \"name\": \"intersection\",", + " \"operation\": \"INTERSECT\",", + " \"nominalEntries\": 1024,", + " \"numberOfValues\": 2,", + " \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"arrayOfDoublesSketchConstant\", \"name\": \"external_sketch\", \"value\": \"AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAABA\"}]", + " }},", + " {\"type\": \"arrayOfDoublesSketchToBase64String\", \"name\": \"intersectionString\", \"field\": {", + " \"type\": \"arrayOfDoublesSketchSetOp\",", + " \"name\": \"intersection\",", + " \"operation\": \"INTERSECT\",", + " \"nominalEntries\": 1024,", + " \"numberOfValues\": 2,", + " \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"arrayOfDoublesSketchConstant\", \"name\": \"external_sketch\", \"value\": \"AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAABA\"}]", + " }}", + " ],", + " \"intervals\": [\"2015-01-01T00:00:00.000Z/2015-01-31T00:00:00.000Z\"]", + "}" + ) + ); + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + ResultRow row = results.get(0); + Assert.assertEquals("sketch", 40.0, (double) row.get(0), 0); + //Assert.assertEquals("estimateSum", 40.0, (double) row.get(1), 0); + + Object estimateSumObj = row.get(1); // estimateSum + Assert.assertTrue(estimateSumObj instanceof double[]); + double[] estimateSum = (double[]) estimateSumObj; + Assert.assertEquals(2, estimateSum.length); + Assert.assertEquals(40.0, estimateSum[0], 0); + Assert.assertEquals(80.0, estimateSum[1], 0); + + Object intersectEstimateSumObj = row.get(2); // intersectEstimateSum + Assert.assertTrue(intersectEstimateSumObj instanceof double[]); + double[] intersectEstimateSum = (double[]) intersectEstimateSumObj; + Assert.assertEquals(2, intersectEstimateSum.length); + Assert.assertEquals(4.0, intersectEstimateSum[0], 0); + Assert.assertEquals(8.0, intersectEstimateSum[1], 0); + + //convert intersected to base64 string + Assert.assertEquals("intersectionString", "AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAABAAAAAAAAAEEAAAAAAAAAAQAAAAAAAABBA", (String) row.get(3)); + + + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregatorTest.java new file mode 100644 index 00000000000..9938a0e22e2 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchConstantPostAggregatorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.query.aggregation.PostAggregator; +import org.junit.Assert; +import org.junit.Test; + + +public class ArrayOfDoublesSketchConstantPostAggregatorTest +{ + + @Test + public void testSketchValue() + { + final String value = "AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA="; + final ArrayOfDoublesSketchConstantPostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator( + "constant_sketch", + value + ); + Assert.assertNotNull(postAgg.getSketchValue()); + } + + + @Test + public void testToString() + { + final PostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator( + "constant_sketch", + "AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA=" + ); + + Assert.assertEquals( + "ArrayOfDoublesSketchConstantPostAggregator{name='constant_sketch', value='AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA='}", + postAgg.toString() + ); + } + + @Test + public void testComparator() + { + final PostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator( + "constant_sketch", + "AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA=" + ); + + Assert.assertEquals(Comparators.alwaysEqual(), postAgg.getComparator()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(ArrayOfDoublesSketchConstantPostAggregator.class) + .usingGetClass() + .verify(); + } +} + diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregatorTest.java new file mode 100755 index 00000000000..ed017280fd6 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToBase64StringPostAggregatorTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class ArrayOfDoublesSketchToBase64StringPostAggregatorTest +{ + @Test + public void testSerde() throws JsonProcessingException + { + final PostAggregator there = new ArrayOfDoublesSketchToBase64StringPostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + DefaultObjectMapper mapper = new DefaultObjectMapper(); + ArrayOfDoublesSketchToBase64StringPostAggregator andBackAgain = mapper.readValue( + mapper.writeValueAsString(there), + ArrayOfDoublesSketchToBase64StringPostAggregator.class + ); + + Assert.assertEquals(there, andBackAgain); + Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey()); + } + + @Test + public void testToString() + { + final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + + Assert.assertEquals( + "ArrayOfDoublesSketchToBase64StringPostAggregator{name='a', field=ConstantPostAggregator{name='', constantValue=0}}", + postAgg.toString() + ); + } + + @Test + public void testCompute() + { + ArrayOfDoublesUpdatableSketch s1 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(16) + .setNumberOfValues(2) + .build(); + + s1.update("foo", new double[] {1.0, 2.0}); + + PostAggregator field1 = EasyMock.createMock(PostAggregator.class); + EasyMock.expect(field1.compute(EasyMock.anyObject(Map.class))).andReturn(s1).anyTimes(); + EasyMock.replay(field1); + + final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator( + "a", + field1 + ); + Assert.assertNotNull("output string should not be null", postAgg.compute(ImmutableMap.of())); + } + + @Test + public void testComparator() + { + final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + Exception exception = Assert.assertThrows(IAE.class, () -> postAgg.getComparator()); + Assert.assertEquals("Comparing sketch summaries is not supported", exception.getMessage()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(ArrayOfDoublesSketchToBase64StringPostAggregator.class) + .withNonnullFields("name", "field") + .withIgnoredFields("dependentFields") + .usingGetClass() + .verify(); + } + + @Test + public void testResultArraySignature() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2000/3000") + .granularity(Granularities.HOUR) + .aggregators( + new CountAggregatorFactory("count") + ) + .postAggregators( + new ArrayOfDoublesSketchToBase64StringPostAggregator( + "a", + new ConstantPostAggregator("", 1) + ) + ) + .build(); + + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("count", ColumnType.LONG) + .add("a", ColumnType.STRING) + .build(), + new TimeseriesQueryQueryToolChest().resultArraySignature(query) + ); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregatorTest.java new file mode 100755 index 00000000000..7a0d75d211a --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMetricsSumEstimatePostAggregatorTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.tuple; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Map; + +public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregatorTest +{ + @Test + public void testSerde() throws JsonProcessingException + { + final PostAggregator there = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + DefaultObjectMapper mapper = new DefaultObjectMapper(); + ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator andBackAgain = mapper.readValue( + mapper.writeValueAsString(there), + ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class + ); + + Assert.assertEquals(there, andBackAgain); + Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey()); + } + + @Test + public void testToString() + { + PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + + Assert.assertEquals( + "ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator{name='a', field=ConstantPostAggregator{name='', constantValue=0}}", + postAgg.toString() + ); + } + + @Test + public void testCompute() + { + ArrayOfDoublesUpdatableSketch s1 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(16) + .setNumberOfValues(2) + .build(); + + s1.update("key1", new double[] {1.0, 2.0}); + s1.update("key2", new double[] {1.0, 2.0}); + s1.update("key3", new double[] {1.0, 2.0}); + s1.update("key4", new double[] {1.0, 2.0}); + + PostAggregator field1 = EasyMock.createMock(PostAggregator.class); + EasyMock.expect(field1.compute(EasyMock.anyObject(Map.class))).andReturn(s1).anyTimes(); + EasyMock.replay(field1); + + final PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + "a", + field1 + ); + double[] expectedOutput = {4.0, 8.0}; + Assert.assertTrue(Arrays.equals(expectedOutput, (double[]) postAgg.compute(ImmutableMap.of()))); + } + + + @Test + public void testComparator() + { + final PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + "a", + new ConstantPostAggregator("", 0) + ); + Exception exception = Assert.assertThrows(IAE.class, () -> postAgg.getComparator()); + Assert.assertEquals("Comparing arrays of estimate values is not supported", exception.getMessage()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class) + .withNonnullFields("name", "field") + .withIgnoredFields("dependentFields") + .usingGetClass() + .verify(); + } + + @Test + public void testResultArraySignature() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2000/3000") + .granularity(Granularities.HOUR) + .aggregators( + new CountAggregatorFactory("count") + ) + .postAggregators( + new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator( + "a", + new ConstantPostAggregator("", 0) + ) + ) + .build(); + + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("count", ColumnType.LONG) + .add("a", ColumnType.DOUBLE_ARRAY) + .build(), + new TimeseriesQueryQueryToolChest().resultArraySignature(query) + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java old mode 100644 new mode 100755 index 301baf4b967..98161bd37e0 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -149,6 +149,11 @@ public class AggregatorUtil public static final byte KLL_FLOATS_SKETCH_BUILD_CACHE_TYPE_ID = 0x4A; public static final byte KLL_FLOATS_SKETCH_MERGE_CACHE_TYPE_ID = 0x4B; + + public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID = 0x4C; + public static final byte ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID = 0x4D; + public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID = 0x4E; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg *