diff --git a/docs/content/development/extensions-core/datasketches-hll.md b/docs/content/development/extensions-core/datasketches-hll.md index 90e284f155b..f9868bd1c4e 100644 --- a/docs/content/development/extensions-core/datasketches-hll.md +++ b/docs/content/development/extensions-core/datasketches-hll.md @@ -41,7 +41,8 @@ druid.extensions.loadList=["druid-datasketches"] "name" : , "fieldName" : , "lgK" : , - "tgtHllType" : + "tgtHllType" : , + "round": } ``` @@ -51,7 +52,8 @@ druid.extensions.loadList=["druid-datasketches"] "name" : , "fieldName" : , "lgK" : , - "tgtHllType" : + "tgtHllType" : , + "round": } ``` @@ -62,6 +64,7 @@ druid.extensions.loadList=["druid-datasketches"] |fieldName|A String for the name of the input field.|yes| |lgK|log2 of K that is the number of buckets in the sketch, parameter that controls the size and the accuracy. Must be a power of 2 from 4 to 21 inclusively.|no, defaults to 12| |tgtHllType|The type of the target HLL sketch. Must be "HLL_4", "HLL_6" or "HLL_8" |no, defaults to "HLL_4"| +|round|Round off values to whole numbers. Only affects query-time behavior and is ignored at ingestion-time.|no, defaults to false| ### Post Aggregators diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index dbbeb5b7ae2..84afdcd5670 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -40,7 +40,6 @@ import java.util.Objects; */ public abstract class HllSketchAggregatorFactory extends AggregatorFactory { - public static final int DEFAULT_LG_K = 12; public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4; @@ -51,18 +50,21 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory private final String fieldName; private final int lgK; private final TgtHllType tgtHllType; + private final boolean round; HllSketchAggregatorFactory( final String name, final String fieldName, @Nullable final Integer lgK, - @Nullable final String tgtHllType + @Nullable final String tgtHllType, + final boolean round ) { this.name = Objects.requireNonNull(name); this.fieldName = Objects.requireNonNull(fieldName); this.lgK = lgK == null ? DEFAULT_LG_K : lgK; this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType); + this.round = round; } @Override @@ -90,6 +92,12 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory return tgtHllType.toString(); } + @JsonProperty + public boolean isRound() + { + return round; + } + @Override public List requiredFields() { @@ -103,7 +111,9 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory @Override public List getRequiredColumns() { - return Collections.singletonList(new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString())); + return Collections.singletonList( + new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), round) + ); } @Override @@ -159,13 +169,19 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory @Nullable @Override - public Double finalizeComputation(@Nullable final Object object) + public Object finalizeComputation(@Nullable final Object object) { if (object == null) { return null; } final HllSketch sketch = (HllSketch) object; - return sketch.getEstimate(); + final double estimate = sketch.getEstimate(); + + if (round) { + return Math.round(estimate); + } else { + return estimate; + } } @Override @@ -177,14 +193,14 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory @Override public AggregatorFactory getCombiningFactory() { - return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType()); + return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType(), isRound()); } @Override public byte[] getCacheKey() { return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName) - .appendInt(lgK).appendInt(tgtHllType.ordinal()).build(); + .appendInt(lgK).appendInt(tgtHllType.ordinal()).build(); } @Override @@ -209,6 +225,9 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory if (!tgtHllType.equals(that.tgtHllType)) { return false; } + if (round != that.round) { + return false; + } return true; } @@ -222,11 +241,12 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory public String toString() { return getClass().getSimpleName() + " {" - + "name=" + name - + "fieldName=" + fieldName - + "lgK=" + lgK - + "tgtHllType=" + tgtHllType - + "}"; + + " name=" + name + + ", fieldName=" + fieldName + + ", lgK=" + lgK + + ", tgtHllType=" + tgtHllType + + ", round=" + round + + " }"; } protected abstract byte getCacheTypeId(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 8668b78a5f8..1f4307783a4 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -43,9 +43,11 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, - @JsonProperty("tgtHllType") @Nullable final String tgtHllType) + @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("round") final boolean round + ) { - super(name, fieldName, lgK, tgtHllType); + super(name, fieldName, lgK, tgtHllType, round); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 3f4cbc2609a..aac9eb0b5a1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -46,10 +46,11 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, - @JsonProperty("tgtHllType") @Nullable final String tgtHllType + @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType); + super(name, fieldName, lgK, tgtHllType, round); } @Override @@ -59,10 +60,11 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other; return new HllSketchMergeAggregatorFactory( - getName(), - getName(), - Math.max(getLgK(), castedOther.getLgK()), - getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType() + getName(), + getName(), + Math.max(getLgK(), castedOther.getLgK()), + getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(), + isRound() || castedOther.isRound() ); } else { throw new AggregatorFactoryNotMergeableException(this, other); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java index 56931b66b35..0bfccac1c2e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregator.java @@ -60,6 +60,7 @@ public class HllSketchSqlAggregator implements SqlAggregator { private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction(); private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL"; + private static final boolean ROUND = true; @Override public SqlAggFunction calciteFunction() @@ -134,8 +135,15 @@ public class HllSketchSqlAggregator implements SqlAggregator final AggregatorFactory aggregatorFactory; final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; - if (columnArg.isDirectColumnAccess() && rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) { - aggregatorFactory = new HllSketchMergeAggregatorFactory(aggregatorName, columnArg.getDirectColumn(), logK, tgtHllType); + if (columnArg.isDirectColumnAccess() + && rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) { + aggregatorFactory = new HllSketchMergeAggregatorFactory( + aggregatorName, + columnArg.getDirectColumn(), + logK, + tgtHllType, + ROUND + ); } else { final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName(); final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); @@ -161,7 +169,8 @@ public class HllSketchSqlAggregator implements SqlAggregator aggregatorName, dimensionSpec.getDimension(), logK, - tgtHllType + tgtHllType, + ROUND ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java new file mode 100644 index 00000000000..71ac4d80ec1 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import com.yahoo.sketches.hll.HllSketch; +import com.yahoo.sketches.hll.TgtHllType; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class HllSketchAggregatorFactoryTest +{ + private static final String NAME = "name"; + private static final String FIELD_NAME = "fieldName"; + private static final int LG_K = HllSketchAggregatorFactory.DEFAULT_LG_K; + private static final String TGT_HLL_TYPE = TgtHllType.HLL_4.name(); + private static final boolean ROUND = true; + private static final double ESTIMATE = Math.PI; + + private TestHllSketchAggregatorFactory target; + + @Before + public void setUp() + { + target = new TestHllSketchAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND); + } + + @Test + public void testIsRound() + { + Assert.assertEquals(ROUND, target.isRound()); + } + + @Test + public void testGetRequiredColumns() + { + List aggregatorFactories = target.getRequiredColumns(); + Assert.assertEquals(1, aggregatorFactories.size()); + HllSketchAggregatorFactory aggregatorFactory = (HllSketchAggregatorFactory) aggregatorFactories.get(0); + Assert.assertEquals(FIELD_NAME, aggregatorFactory.getName()); + Assert.assertEquals(FIELD_NAME, aggregatorFactory.getFieldName()); + Assert.assertEquals(LG_K, aggregatorFactory.getLgK()); + Assert.assertEquals(TGT_HLL_TYPE, aggregatorFactory.getTgtHllType()); + Assert.assertEquals(ROUND, aggregatorFactory.isRound()); + } + + @Test + public void testFinalizeComputationNull() + { + Assert.assertNull(target.finalizeComputation(null)); + } + + @Test + public void testFinalizeComputationRound() + { + Object actual = target.finalizeComputation(getMockSketch()); + Assert.assertTrue(actual instanceof Long); + Assert.assertEquals(3L, actual); + } + + private static HllSketch getMockSketch() + { + HllSketch sketch = EasyMock.mock(HllSketch.class); + EasyMock.expect(sketch.getEstimate()).andReturn(ESTIMATE); + EasyMock.replay(sketch); + return sketch; + } + + @Test + public void testFinalizeComputatioNoRound() + { + TestHllSketchAggregatorFactory t = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + !ROUND + ); + Object actual = t.finalizeComputation(getMockSketch()); + Assert.assertTrue(actual instanceof Double); + Assert.assertEquals(ESTIMATE, actual); + } + + @Test + public void testEqualsSameObject() + { + Assert.assertEquals(target, target); + } + + @Test + public void testEqualsOtherNull() + { + Assert.assertNotEquals(target, null); + } + + @Test + public void testEqualsOtherDiffClass() + { + Assert.assertNotEquals(target, NAME); + } + + @Test + public void testEqualsOtherDiffName() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME + "-diff", + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + ROUND + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void testEqualsOtherDiffFieldName() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME + "-diff", + LG_K, + TGT_HLL_TYPE, + ROUND + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void testEqualsOtherDiffLgK() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME, + LG_K + 1, + TGT_HLL_TYPE, + ROUND + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void testEqualsOtherDiffTgtHllType() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TgtHllType.HLL_8.name(), + ROUND + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void testEqualsOtherDiffRound() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + !ROUND + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void testEqualsOtherMatches() + { + TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + ROUND + ); + Assert.assertEquals(target, other); + } + + @Test + public void testToString() + { + String string = target.toString(); + List toStringFields = Arrays.stream(HllSketchAggregatorFactory.class.getDeclaredFields()) + .filter(HllSketchAggregatorFactoryTest::isToStringField) + .collect(Collectors.toList()); + + for (Field field : toStringFields) { + String expectedToken = formatFieldForToString(field); + Assert.assertTrue("Missing \"" + expectedToken + "\"", string.contains(expectedToken)); + } + } + + private static boolean isToStringField(Field field) + { + int modfiers = field.getModifiers(); + return Modifier.isPrivate(modfiers) && !Modifier.isStatic(modfiers) && Modifier.isFinal(modfiers); + } + + private static String formatFieldForToString(Field field) + { + return " " + field.getName() + "="; + } + + // Helper for testing abstract base class + private static class TestHllSketchAggregatorFactory extends HllSketchAggregatorFactory + { + private static final byte DUMMY_CACHE_TYPE_ID = 0; + private static final Aggregator DUMMY_AGGREGATOR = null; + private static final BufferAggregator DUMMY_BUFFER_AGGREGATOR = null; + private static final String DUMMY_TYPE_NAME = null; + private static final int DUMMY_SIZE = 0; + + TestHllSketchAggregatorFactory( + String name, + String fieldName, + @Nullable Integer lgK, + @Nullable String tgtHllType, + boolean round + ) + { + super(name, fieldName, lgK, tgtHllType, round); + } + + @Override + protected byte getCacheTypeId() + { + return DUMMY_CACHE_TYPE_ID; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return DUMMY_AGGREGATOR; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return DUMMY_BUFFER_AGGREGATOR; + } + + @Override + public String getTypeName() + { + return DUMMY_TYPE_NAME; + } + + @Override + public int getMaxIntermediateSize() + { + return DUMMY_SIZE; + } + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index 0490b34e61f..1dc64cc4fb8 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -19,6 +19,9 @@ package org.apache.druid.query.aggregation.datasketches.hll; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; @@ -34,12 +37,17 @@ import org.junit.runners.Parameterized; import java.io.File; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; @RunWith(Parameterized.class) public class HllSketchAggregatorTest { + private static final boolean ROUND = true; + private final AggregationTestHelper helper; @Rule @@ -57,7 +65,7 @@ public class HllSketchAggregatorTest { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[] {config}); + constructors.add(new Object[]{config}); } return constructors; } @@ -67,39 +75,16 @@ public class HllSketchAggregatorTest { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()), - String.join("\n", - "{", - " \"type\": \"string\",", - " \"parseSpec\": {", - " \"format\": \"tsv\",", - " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},", - " \"dimensionsSpec\": {", - " \"dimensions\": [\"dim\", \"multiDim\"],", - " \"dimensionExclusions\": [],", - " \"spatialDimensions\": []", - " },", - " \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"sketch\"],", - " \"listDelimiter\": \",\"", - " }", - "}"), - String.join("\n", - "[", - " {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}", - "]"), + buildParserJson( + Arrays.asList("dim", "multiDim"), + Arrays.asList("timestamp", "dim", "multiDim", "sketch") + ), + buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND), 0, // minTimestamp Granularities.NONE, 200, // maxRowCount - String.join("\n", - "{", - " \"queryType\": \"groupBy\",", - " \"dataSource\": \"test_datasource\",", - " \"granularity\": \"ALL\",", - " \"dimensions\": [],", - " \"aggregations\": [", - " {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}", - " ],", - " \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]", - "}")); + buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND) + ); List results = seq.toList(); Assert.assertEquals(1, results.size()); Row row = results.get(0); @@ -111,39 +96,16 @@ public class HllSketchAggregatorTest { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), - String.join("\n", - "{", - " \"type\": \"string\",", - " \"parseSpec\": {", - " \"format\": \"tsv\",", - " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},", - " \"dimensionsSpec\": {", - " \"dimensions\": [\"dim\"],", - " \"dimensionExclusions\": [],", - " \"spatialDimensions\": []", - " },", - " \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],", - " \"listDelimiter\": \",\"", - " }", - "}"), - String.join("\n", - "[", - " {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"id\"}", - "]"), + buildParserJson( + Collections.singletonList("dim"), + Arrays.asList("timestamp", "dim", "multiDim", "id") + ), + buildAggregatorJson("HLLSketchBuild", "id", !ROUND), 0, // minTimestamp Granularities.NONE, 200, // maxRowCount - String.join("\n", - "{", - " \"queryType\": \"groupBy\",", - " \"dataSource\": \"test_datasource\",", - " \"granularity\": \"ALL\",", - " \"dimensions\": [],", - " \"aggregations\": [", - " {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}", - " ],", - " \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]", - "}")); + buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND) + ); List results = seq.toList(); Assert.assertEquals(1, results.size()); Row row = results.get(0); @@ -155,36 +117,16 @@ public class HllSketchAggregatorTest { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), - String.join("\n", - "{", - " \"type\": \"string\",", - " \"parseSpec\": {", - " \"format\": \"tsv\",", - " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},", - " \"dimensionsSpec\": {", - " \"dimensions\": [\"dim\", \"multiDim\", \"id\"],", - " \"dimensionExclusions\": [],", - " \"spatialDimensions\": []", - " },", - " \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],", - " \"listDelimiter\": \",\"", - " }", - "}"), + buildParserJson( + Arrays.asList("dim", "multiDim", "id"), + Arrays.asList("timestamp", "dim", "multiDim", "id") + ), "[]", 0, // minTimestamp Granularities.NONE, 200, // maxRowCount - String.join("\n", - "{", - " \"queryType\": \"groupBy\",", - " \"dataSource\": \"test_datasource\",", - " \"granularity\": \"ALL\",", - " \"dimensions\": [],", - " \"aggregations\": [", - " {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"id\"}", - " ],", - " \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]", - "}")); + buildGroupByQueryJson("HLLSketchBuild", "id", !ROUND) + ); List results = seq.toList(); Assert.assertEquals(1, results.size()); Row row = results.get(0); @@ -196,39 +138,149 @@ public class HllSketchAggregatorTest { Sequence seq = helper.createIndexAndRunQueryOnSegment( new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), - String.join("\n", - "{", - " \"type\": \"string\",", - " \"parseSpec\": {", - " \"format\": \"tsv\",", - " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},", - " \"dimensionsSpec\": {", - " \"dimensions\": [\"dim\", \"multiDim\", \"id\"],", - " \"dimensionExclusions\": [],", - " \"spatialDimensions\": []", - " },", - " \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],", - " \"listDelimiter\": \",\"", - " }", - "}"), + buildParserJson( + Arrays.asList("dim", "multiDim", "id"), + Arrays.asList("timestamp", "dim", "multiDim", "id") + ), "[]", 0, // minTimestamp Granularities.NONE, 200, // maxRowCount - String.join("\n", - "{", - " \"queryType\": \"groupBy\",", - " \"dataSource\": \"test_datasource\",", - " \"granularity\": \"ALL\",", - " \"dimensions\": [],", - " \"aggregations\": [", - " {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"multiDim\"}", - " ],", - " \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]", - "}")); + buildGroupByQueryJson("HLLSketchBuild", "multiDim", !ROUND) + ); List results = seq.toList(); Assert.assertEquals(1, results.size()); Row row = results.get(0); Assert.assertEquals(14, (double) row.getMetric("sketch"), 0.1); } + + @Test + public void roundBuildSketch() throws Exception + { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), + buildParserJson( + Arrays.asList("dim", "multiDim", "id"), + Arrays.asList("timestamp", "dim", "multiDim", "id") + ), + "[]", + 0, // minTimestamp + Granularities.NONE, + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchBuild", "id", ROUND) + ); + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + Row row = results.get(0); + Assert.assertEquals(200L, (long) row.getMetric("sketch")); + } + + @Test + public void roundMergeSketch() throws Exception + { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()), + buildParserJson( + Arrays.asList("dim", "multiDim"), + Arrays.asList("timestamp", "dim", "multiDim", "sketch") + ), + buildAggregatorJson("HLLSketchMerge", "sketch", ROUND), + 0, // minTimestamp + Granularities.NONE, + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND) + ); + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + Row row = results.get(0); + Assert.assertEquals(200L, (long) row.getMetric("sketch")); + } + + private static String buildParserJson(List dimensions, List columns) + { + Map timestampSpec = ImmutableMap.of( + "column", "timestamp", + "format", "yyyyMMdd" + ); + Map dimensionsSpec = ImmutableMap.of( + "dimensions", dimensions, + "dimensionExclusions", Collections.emptyList(), + "spatialDimensions", Collections.emptyList() + ); + Map parseSpec = ImmutableMap.of( + "format", "tsv", + "timestampSpec", timestampSpec, + "dimensionsSpec", dimensionsSpec, + "columns", columns, + "listDelimiter", "," + ); + Map object = ImmutableMap.of( + "type", "string", + "parseSpec", parseSpec + ); + return toJson(object); + } + + private static String toJson(Object object) + { + final String json; + try { + ObjectMapper objectMapper = new ObjectMapper(); + json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return json; + } + + private static String buildAggregatorJson( + String aggregationType, + String aggregationFieldName, + boolean aggregationRound + ) + { + Map aggregator = buildAggregatorObject( + aggregationType, + aggregationFieldName, + aggregationRound + ); + return toJson(Collections.singletonList(aggregator)); + } + + private static Map buildAggregatorObject( + String aggregationType, + String aggregationFieldName, + boolean aggregationRound + ) + { + return ImmutableMap.of( + "type", aggregationType, + "name", "sketch", + "fieldName", aggregationFieldName, + "round", aggregationRound + ); + } + + private static String buildGroupByQueryJson( + String aggregationType, + String aggregationFieldName, + boolean aggregationRound + ) + { + Map aggregation = buildAggregatorObject( + aggregationType, + aggregationFieldName, + aggregationRound + ); + Map object = new ImmutableMap.Builder() + .put("queryType", "groupBy") + .put("dataSource", "test_dataSource") + .put("granularity", "ALL") + .put("dimensions", Collections.emptyList()) + .put("aggregations", Collections.singletonList(aggregation)) + .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z")) + .build(); + return toJson(object); + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java new file mode 100644 index 00000000000..c08db6926c9 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import com.yahoo.sketches.hll.TgtHllType; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HllSketchMergeAggregatorFactoryTest +{ + private static final String NAME = "name"; + private static final String FIELD_NAME = "fieldName"; + private static final int LG_K = 2; + private static final String TGT_HLL_TYPE = TgtHllType.HLL_6.name(); + private static final boolean ROUND = true; + + private HllSketchMergeAggregatorFactory targetRound; + private HllSketchMergeAggregatorFactory targetNoRound; + + @Before + public void setUp() + { + targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND); + targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, !ROUND); + } + + @Test(expected = AggregatorFactoryNotMergeableException.class) + public void testGetMergingFactoryBadName() throws Exception + { + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME + "-diff", + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + ROUND + ); + targetRound.getMergingFactory(other); + } + + @Test(expected = AggregatorFactoryNotMergeableException.class) + public void testGetMergingFactoryBadType() throws Exception + { + HllSketchBuildAggregatorFactory other = new HllSketchBuildAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + ROUND + ); + targetRound.getMergingFactory(other); + } + + @Test + public void testGetMergingFactoryOtherSmallerLgK() throws Exception + { + final int smallerLgK = LG_K - 1; + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + smallerLgK, + TGT_HLL_TYPE, + ROUND + ); + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); + Assert.assertEquals(LG_K, result.getLgK()); + } + + @Test + public void testGetMergingFactoryOtherLargerLgK() throws Exception + { + final int largerLgK = LG_K + 1; + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + largerLgK, + TGT_HLL_TYPE, + ROUND + ); + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); + Assert.assertEquals(largerLgK, result.getLgK()); + } + + @Test + public void testGetMergingFactoryOtherSmallerTgtHllType() throws Exception + { + String smallerTgtHllType = TgtHllType.HLL_4.name(); + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + smallerTgtHllType, + ROUND + ); + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); + Assert.assertEquals(TGT_HLL_TYPE, result.getTgtHllType()); + } + + @Test + public void testGetMergingFactoryOtherLargerTgtHllType() throws Exception + { + String largerTgtHllType = TgtHllType.HLL_8.name(); + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + largerTgtHllType, + ROUND + ); + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); + Assert.assertEquals(largerTgtHllType, result.getTgtHllType()); + } + + @Test + public void testGetMergingFactoryThisNoRoundOtherNoRound() throws Exception + { + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetNoRound.getMergingFactory(targetNoRound); + Assert.assertFalse(result.isRound()); + } + + @Test + public void testGetMergingFactoryThisNoRoundOtherRound() throws Exception + { + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetNoRound.getMergingFactory(targetRound); + Assert.assertTrue(result.isRound()); + } + + @Test + public void testGetMergingFactoryThisRoundOtherNoRound() throws Exception + { + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(targetNoRound); + Assert.assertTrue(result.isRound()); + } + + @Test + public void testGetMergingFactoryThisRoundOtherRound() throws Exception + { + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(targetRound); + Assert.assertTrue(result.isRound()); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 70cccbad626..6a2f9801d7b 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -88,6 +88,7 @@ import java.util.Map; public class HllSketchSqlAggregatorTest extends CalciteTestBase { private static final String DATA_SOURCE = "foo"; + private static final boolean ROUND = true; private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; @@ -140,7 +141,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase "hllsketch_dim1", "dim1", null, - null + null, + ROUND ) ) .withRollup(false) @@ -265,14 +267,16 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase "a1", "dim2", null, - null + null, + ROUND ), new FilteredAggregatorFactory( new HllSketchBuildAggregatorFactory( "a2", "dim2", null, - null + null, + ROUND ), BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null)) ), @@ -280,16 +284,18 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase "a3", "v0", null, - null + null, + ROUND ), new HllSketchBuildAggregatorFactory( "a4", "v1", null, - null + null, + ROUND ), - new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8"), - new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null) + new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", ROUND), + new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND) ) ) .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) @@ -306,7 +312,11 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase final String sql = "SELECT\n" + " AVG(u)\n" - + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT_DS_HLL(cnt) AS u FROM druid.foo GROUP BY 1)"; + + "FROM (" + + " SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT_DS_HLL(cnt) AS u\n" + + " FROM druid.foo\n" + + " GROUP BY 1\n" + + ")"; // Verify results final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); @@ -351,7 +361,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase "a0:a", "cnt", null, - null + null, + ROUND ) ) ) @@ -390,4 +401,22 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify query Assert.assertEquals(expected, actual); } + + @Test + public void testApproxCountDistinctHllSketchIsRounded() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + + final String sql = "SELECT" + + " dim2," + + " APPROX_COUNT_DISTINCT_DS_HLL(m1)" + + " FROM druid.foo" + + " GROUP BY dim2" + + " HAVING APPROX_COUNT_DISTINCT_DS_HLL(m1) = 2"; + + // Verify results + final List results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + final int expected = NullHandling.replaceWithDefault() ? 1 : 2; + Assert.assertEquals(expected, results.size()); + } }