Add Post Aggregators for Tuple Sketches (#13819)

You can now do the following operations with TupleSketches in Post Aggregation Step

Get the Sketch Output as Base64 String
Provide a constant Tuple Sketch in post-aggregation step that can be used in Set Operations
Get the Estimated Value(Sum) of Summary/Metrics Objects associated with Tuple Sketch
This commit is contained in:
Anshu Makkar 2023-03-03 09:32:09 +05:30 committed by GitHub
parent b4b354b658
commit a10e4150d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 834 additions and 0 deletions

View File

@ -207,3 +207,46 @@ Returns a human-readable summary of a given ArrayOfDoublesSketch. This is a stri
"field" : <post aggregator that refers to an ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```
### Constant ArrayOfDoublesSketch
This post aggregator adds a Base64-encoded constant ArrayOfDoublesSketch value that you can use in other post aggregators.
```json
{
"type": "arrayOfDoublesSketchConstant",
"name": DESTINATION_COLUMN_NAME,
"value": CONSTANT_SKETCH_VALUE
}
```
### Base64 output of ArrayOfDoublesSketch
This post aggregator outputs an ArrayOfDoublesSketch as a Base64-encoded string storing the constant tuple sketch value that you can use in other post aggregators.
```json
{
"type": "arrayOfDoublesSketchToBase64String",
"name": DESTINATION_COLUMN_NAME,
"field": <post aggregator that refers to a ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```
### Estimated metrics values for each column of ArrayOfDoublesSketch
For the key-value pairs in the given ArrayOfDoublesSketch, this post aggregator estimates the sum for each set of values across the keys. For example, the post aggregator returns `{3.0, 8.0}` for the following key-value pairs:
```
Key_1, {1.0, 3.0}
Key_2, {2.0, 5.0}
```
The post aggregator returns _N_ double values, where _N_ is the number of values associated with each key.
```json
{
"type": "arrayOfDoublesSketchToMetricsSumEstimate",
"name": DESTINATION_COLUMN_NAME,
"field": <post aggregator that refers to a ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* This post-aggregator converts a given Base64 encoded string to an ArrayOfDoublesSketch.
* The input column contains name of post-aggregator output and base64 encoded input string.
* The output is a deserialized {@link ArrayOfDoublesSketch} .
*/
public class ArrayOfDoublesSketchConstantPostAggregator extends ArrayOfDoublesSketchPostAggregator
{
private final String value;
private final ArrayOfDoublesSketch sketchValue;
@JsonCreator
public ArrayOfDoublesSketchConstantPostAggregator(@JsonProperty("name") String name, @JsonProperty("value") String value)
{
super(name);
Preconditions.checkArgument(value != null && !value.isEmpty(),
"Constant value cannot be null or empty, expecting base64 encoded sketch string");
this.value = value;
this.sketchValue = ArrayOfDoublesSketchOperations.deserializeFromBase64EncodedStringSafe(value);
}
@Override
public Set<String> getDependentFields()
{
return Collections.emptySet();
}
@Override
public Comparator getComparator()
{
return Comparators.alwaysEqual();
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return sketchValue;
}
@Override
public ArrayOfDoublesSketchConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
public ArrayOfDoublesSketch getSketchValue()
{
return sketchValue;
}
@Override
public String toString()
{
return "ArrayOfDoublesSketchConstantPostAggregator{name='" + this.getName() + "', value='" + value + "'}";
}
private String getRawSketchValue()
{
return value;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ArrayOfDoublesSketchConstantPostAggregator that = (ArrayOfDoublesSketchConstantPostAggregator) o;
if (!(Objects.equals(this.getName(), that.getName()) && Objects.equals(this.value, that.value)
&& Objects.equals(this.getSketchValue(), that.getSketchValue()))) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), value, sketchValue);
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID)
.appendString(DigestUtils.sha1Hex(value)).build();
}
}

View File

@ -48,6 +48,13 @@ public class ArrayOfDoublesSketchModule implements DruidModule
public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG);
public static final ColumnType MERGE_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG);
public static final String ARRAY_OF_DOUBLES_SKETCH_CONSTANT = "arrayOfDoublesSketchConstant";
public static final String ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING = "arrayOfDoublesSketchToBase64String";
public static final String ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE = "arrayOfDoublesSketchToMetricsSumEstimate";
@Override
public void configure(final Binder binder)
{
@ -100,6 +107,18 @@ public class ArrayOfDoublesSketchModule implements DruidModule
new NamedType(
ArrayOfDoublesSketchToStringPostAggregator.class,
"arrayOfDoublesSketchToString"
),
new NamedType(
ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE
),
new NamedType(
ArrayOfDoublesSketchConstantPostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_CONSTANT
),
new NamedType(
ArrayOfDoublesSketchToBase64StringPostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING
)
).addSerializer(ArrayOfDoublesSketch.class, new ArrayOfDoublesSketchJsonSerializer())
);

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import java.util.Comparator;
import java.util.Map;
/**
* Returns a base64 encoded string of a given {@link ArrayOfDoublesSketch}.
* This is a string returned by encoding the output of toByteArray() using Base64 method of the sketch.
* This can be useful for debugging and using the sketch output in other operations.
*/
public class ArrayOfDoublesSketchToBase64StringPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
{
@JsonCreator
public ArrayOfDoublesSketchToBase64StringPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field
)
{
super(name, field);
}
@Override
public String compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
return StringUtils.encodeBase64String(sketch.toByteArray());
}
@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.STRING;
}
@Override
public Comparator<String> getComparator()
{
throw new IAE("Comparing sketch summaries is not supported");
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID)
.appendCacheable(getField())
.build();
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketchIterator;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
/**
* Returns a list of estimate values of metrics column from a given {@link ArrayOfDoublesSketch}.
* The result will be N double values, where N is the number of double values kept in the sketch per key.
*/
public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
{
@JsonCreator
public ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field
)
{
super(name, field);
}
@Override
public double[] compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
Arrays.setAll(stats, i -> new SummaryStatistics());
final ArrayOfDoublesSketchIterator it = sketch.iterator();
while (it.next()) {
final double[] values = it.getValues();
for (int i = 0; i < values.length; i++) {
stats[i].addValue(values[i]);
}
}
final double[] estimates = new double[sketch.getNumValues()];
Arrays.setAll(estimates, i -> (stats[i].getSum()) / (sketch.getTheta()));
return estimates;
}
@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.DOUBLE_ARRAY;
}
@Override
public Comparator<double[]> getComparator()
{
throw new IAE("Comparing arrays of estimate values is not supported");
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID)
.appendCacheable(getField())
.build();
}
}

View File

@ -1244,4 +1244,97 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling
Assert.assertEquals(0.0, ds3.getMinValue(), 0);
Assert.assertEquals(3.0, ds3.getMaxValue(), 0);
}
//Test ConstantTupleSketchPost-Agg and Base64 Encoding
@Test
public void testConstantAndBase64WithEstimateSumPostAgg() throws Exception
{
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("tuple/array_of_doubles_sketch_data_two_values.tsv")
.getFile()),
String.join(
"\n",
"{",
" \"type\": \"string\",",
" \"parseSpec\": {",
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"product\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"product\", \"sketch\"]",
" }",
"}"
),
String.join(
"\n",
"[",
" {\"type\": \"arrayOfDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"nominalEntries\": 1024, \"numberOfValues\": 2}",
"]"
),
0, // minTimestamp
Granularities.NONE,
10, // maxRowCount
String.join(
"\n",
"{",
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"arrayOfDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"nominalEntries\": 1024, \"numberOfValues\": 2}",
" ],",
" \"postAggregations\": [",
" {\"type\": \"arrayOfDoublesSketchToMetricsSumEstimate\", \"name\": \"estimateSum\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},",
" {\"type\": \"arrayOfDoublesSketchToMetricsSumEstimate\", \"name\": \"intersection\", \"field\": {",
" \"type\": \"arrayOfDoublesSketchSetOp\",",
" \"name\": \"intersection\",",
" \"operation\": \"INTERSECT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"arrayOfDoublesSketchConstant\", \"name\": \"external_sketch\", \"value\": \"AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAABA\"}]",
" }},",
" {\"type\": \"arrayOfDoublesSketchToBase64String\", \"name\": \"intersectionString\", \"field\": {",
" \"type\": \"arrayOfDoublesSketchSetOp\",",
" \"name\": \"intersection\",",
" \"operation\": \"INTERSECT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"arrayOfDoublesSketchConstant\", \"name\": \"external_sketch\", \"value\": \"AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAPA/AAAAAAAAAEAAAAAAAADwPwAAAAAAAABA\"}]",
" }}",
" ],",
" \"intervals\": [\"2015-01-01T00:00:00.000Z/2015-01-31T00:00:00.000Z\"]",
"}"
)
);
List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
ResultRow row = results.get(0);
Assert.assertEquals("sketch", 40.0, (double) row.get(0), 0);
//Assert.assertEquals("estimateSum", 40.0, (double) row.get(1), 0);
Object estimateSumObj = row.get(1); // estimateSum
Assert.assertTrue(estimateSumObj instanceof double[]);
double[] estimateSum = (double[]) estimateSumObj;
Assert.assertEquals(2, estimateSum.length);
Assert.assertEquals(40.0, estimateSum[0], 0);
Assert.assertEquals(80.0, estimateSum[1], 0);
Object intersectEstimateSumObj = row.get(2); // intersectEstimateSum
Assert.assertTrue(intersectEstimateSumObj instanceof double[]);
double[] intersectEstimateSum = (double[]) intersectEstimateSumObj;
Assert.assertEquals(2, intersectEstimateSum.length);
Assert.assertEquals(4.0, intersectEstimateSum[0], 0);
Assert.assertEquals(8.0, intersectEstimateSum[1], 0);
//convert intersected to base64 string
Assert.assertEquals("intersectionString", "AQEJAwgCzJP/////////fwIAAAAAAAAAbakWvEpmYR4+utyjb2+2IAAAAAAAAABAAAAAAAAAEEAAAAAAAAAAQAAAAAAAABBA", (String) row.get(3));
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.PostAggregator;
import org.junit.Assert;
import org.junit.Test;
public class ArrayOfDoublesSketchConstantPostAggregatorTest
{
@Test
public void testSketchValue()
{
final String value = "AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA=";
final ArrayOfDoublesSketchConstantPostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator(
"constant_sketch",
value
);
Assert.assertNotNull(postAgg.getSketchValue());
}
@Test
public void testToString()
{
final PostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator(
"constant_sketch",
"AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA="
);
Assert.assertEquals(
"ArrayOfDoublesSketchConstantPostAggregator{name='constant_sketch', value='AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA='}",
postAgg.toString()
);
}
@Test
public void testComparator()
{
final PostAggregator postAgg = new ArrayOfDoublesSketchConstantPostAggregator(
"constant_sketch",
"AQEJAwgBzJP/////////fwIAAAAAAAAAzT6NGdX0aWUOJvS5EIhpLwAAAAAAAAAAAAAAAAAAAAA="
);
Assert.assertEquals(Comparators.alwaysEqual(), postAgg.getComparator());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(ArrayOfDoublesSketchConstantPostAggregator.class)
.usingGetClass()
.verify();
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class ArrayOfDoublesSketchToBase64StringPostAggregatorTest
{
@Test
public void testSerde() throws JsonProcessingException
{
final PostAggregator there = new ArrayOfDoublesSketchToBase64StringPostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
DefaultObjectMapper mapper = new DefaultObjectMapper();
ArrayOfDoublesSketchToBase64StringPostAggregator andBackAgain = mapper.readValue(
mapper.writeValueAsString(there),
ArrayOfDoublesSketchToBase64StringPostAggregator.class
);
Assert.assertEquals(there, andBackAgain);
Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey());
}
@Test
public void testToString()
{
final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
Assert.assertEquals(
"ArrayOfDoublesSketchToBase64StringPostAggregator{name='a', field=ConstantPostAggregator{name='', constantValue=0}}",
postAgg.toString()
);
}
@Test
public void testCompute()
{
ArrayOfDoublesUpdatableSketch s1 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(16)
.setNumberOfValues(2)
.build();
s1.update("foo", new double[] {1.0, 2.0});
PostAggregator field1 = EasyMock.createMock(PostAggregator.class);
EasyMock.expect(field1.compute(EasyMock.anyObject(Map.class))).andReturn(s1).anyTimes();
EasyMock.replay(field1);
final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator(
"a",
field1
);
Assert.assertNotNull("output string should not be null", postAgg.compute(ImmutableMap.of()));
}
@Test
public void testComparator()
{
final PostAggregator postAgg = new ArrayOfDoublesSketchToBase64StringPostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
Exception exception = Assert.assertThrows(IAE.class, () -> postAgg.getComparator());
Assert.assertEquals("Comparing sketch summaries is not supported", exception.getMessage());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(ArrayOfDoublesSketchToBase64StringPostAggregator.class)
.withNonnullFields("name", "field")
.withIgnoredFields("dependentFields")
.usingGetClass()
.verify();
}
@Test
public void testResultArraySignature()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2000/3000")
.granularity(Granularities.HOUR)
.aggregators(
new CountAggregatorFactory("count")
)
.postAggregators(
new ArrayOfDoublesSketchToBase64StringPostAggregator(
"a",
new ConstantPostAggregator("", 1)
)
)
.build();
Assert.assertEquals(
RowSignature.builder()
.addTimeColumn()
.add("count", ColumnType.LONG)
.add("a", ColumnType.STRING)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregatorTest
{
@Test
public void testSerde() throws JsonProcessingException
{
final PostAggregator there = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
DefaultObjectMapper mapper = new DefaultObjectMapper();
ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator andBackAgain = mapper.readValue(
mapper.writeValueAsString(there),
ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class
);
Assert.assertEquals(there, andBackAgain);
Assert.assertArrayEquals(there.getCacheKey(), andBackAgain.getCacheKey());
}
@Test
public void testToString()
{
PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
Assert.assertEquals(
"ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator{name='a', field=ConstantPostAggregator{name='', constantValue=0}}",
postAgg.toString()
);
}
@Test
public void testCompute()
{
ArrayOfDoublesUpdatableSketch s1 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(16)
.setNumberOfValues(2)
.build();
s1.update("key1", new double[] {1.0, 2.0});
s1.update("key2", new double[] {1.0, 2.0});
s1.update("key3", new double[] {1.0, 2.0});
s1.update("key4", new double[] {1.0, 2.0});
PostAggregator field1 = EasyMock.createMock(PostAggregator.class);
EasyMock.expect(field1.compute(EasyMock.anyObject(Map.class))).andReturn(s1).anyTimes();
EasyMock.replay(field1);
final PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"a",
field1
);
double[] expectedOutput = {4.0, 8.0};
Assert.assertTrue(Arrays.equals(expectedOutput, (double[]) postAgg.compute(ImmutableMap.of())));
}
@Test
public void testComparator()
{
final PostAggregator postAgg = new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"a",
new ConstantPostAggregator("", 0)
);
Exception exception = Assert.assertThrows(IAE.class, () -> postAgg.getComparator());
Assert.assertEquals("Comparing arrays of estimate values is not supported", exception.getMessage());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class)
.withNonnullFields("name", "field")
.withIgnoredFields("dependentFields")
.usingGetClass()
.verify();
}
@Test
public void testResultArraySignature()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource("dummy")
.intervals("2000/3000")
.granularity(Granularities.HOUR)
.aggregators(
new CountAggregatorFactory("count")
)
.postAggregators(
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"a",
new ConstantPostAggregator("", 0)
)
)
.build();
Assert.assertEquals(
RowSignature.builder()
.addTimeColumn()
.add("count", ColumnType.LONG)
.add("a", ColumnType.DOUBLE_ARRAY)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}
}

View File

@ -149,6 +149,11 @@ public class AggregatorUtil
public static final byte KLL_FLOATS_SKETCH_BUILD_CACHE_TYPE_ID = 0x4A;
public static final byte KLL_FLOATS_SKETCH_MERGE_CACHE_TYPE_ID = 0x4B;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID = 0x4C;
public static final byte ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID = 0x4D;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID = 0x4E;
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*