Add round support for DS-HLL (#8023)

* Add round support for DS-HLL

Since the Cardinality aggregator has a "round" option to round off estimated
values generated from the HyperLogLog algorithm, add the same "round" option to
the DataSketches HLL Sketch module aggregators to be consistent.

* Fix checkstyle errors

* Change HllSketchSqlAggregator to do rounding

* Fix test for standard-compliant null handling mode
This commit is contained in:
Chi Cao Minh 2019-07-05 15:37:58 -07:00 committed by Gian Merlino
parent 9b499df14e
commit 0ded0ce414
9 changed files with 707 additions and 147 deletions

View File

@ -41,7 +41,8 @@ druid.extensions.loadList=["druid-datasketches"]
"name" : <output name>, "name" : <output name>,
"fieldName" : <metric name>, "fieldName" : <metric name>,
"lgK" : <size and accuracy parameter>, "lgK" : <size and accuracy parameter>,
"tgtHllType" : <target HLL type> "tgtHllType" : <target HLL type>,
"round": <false | true>
} }
``` ```
@ -51,7 +52,8 @@ druid.extensions.loadList=["druid-datasketches"]
"name" : <output name>, "name" : <output name>,
"fieldName" : <metric name>, "fieldName" : <metric name>,
"lgK" : <size and accuracy parameter>, "lgK" : <size and accuracy parameter>,
"tgtHllType" : <target HLL type> "tgtHllType" : <target HLL type>,
"round": <false | true>
} }
``` ```
@ -62,6 +64,7 @@ druid.extensions.loadList=["druid-datasketches"]
|fieldName|A String for the name of the input field.|yes| |fieldName|A String for the name of the input field.|yes|
|lgK|log2 of K that is the number of buckets in the sketch, parameter that controls the size and the accuracy. Must be a power of 2 from 4 to 21 inclusively.|no, defaults to 12| |lgK|log2 of K that is the number of buckets in the sketch, parameter that controls the size and the accuracy. Must be a power of 2 from 4 to 21 inclusively.|no, defaults to 12|
|tgtHllType|The type of the target HLL sketch. Must be "HLL&lowbar;4", "HLL&lowbar;6" or "HLL&lowbar;8" |no, defaults to "HLL&lowbar;4"| |tgtHllType|The type of the target HLL sketch. Must be "HLL&lowbar;4", "HLL&lowbar;6" or "HLL&lowbar;8" |no, defaults to "HLL&lowbar;4"|
|round|Round off values to whole numbers. Only affects query-time behavior and is ignored at ingestion-time.|no, defaults to false|
### Post Aggregators ### Post Aggregators

View File

@ -40,7 +40,6 @@ import java.util.Objects;
*/ */
public abstract class HllSketchAggregatorFactory extends AggregatorFactory public abstract class HllSketchAggregatorFactory extends AggregatorFactory
{ {
public static final int DEFAULT_LG_K = 12; public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4; public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;
@ -51,18 +50,21 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final String fieldName; private final String fieldName;
private final int lgK; private final int lgK;
private final TgtHllType tgtHllType; private final TgtHllType tgtHllType;
private final boolean round;
HllSketchAggregatorFactory( HllSketchAggregatorFactory(
final String name, final String name,
final String fieldName, final String fieldName,
@Nullable final Integer lgK, @Nullable final Integer lgK,
@Nullable final String tgtHllType @Nullable final String tgtHllType,
final boolean round
) )
{ {
this.name = Objects.requireNonNull(name); this.name = Objects.requireNonNull(name);
this.fieldName = Objects.requireNonNull(fieldName); this.fieldName = Objects.requireNonNull(fieldName);
this.lgK = lgK == null ? DEFAULT_LG_K : lgK; this.lgK = lgK == null ? DEFAULT_LG_K : lgK;
this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType); this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType);
this.round = round;
} }
@Override @Override
@ -90,6 +92,12 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
return tgtHllType.toString(); return tgtHllType.toString();
} }
@JsonProperty
public boolean isRound()
{
return round;
}
@Override @Override
public List<String> requiredFields() public List<String> requiredFields()
{ {
@ -103,7 +111,9 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override @Override
public List<AggregatorFactory> getRequiredColumns() public List<AggregatorFactory> getRequiredColumns()
{ {
return Collections.singletonList(new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString())); return Collections.singletonList(
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), round)
);
} }
@Override @Override
@ -159,13 +169,19 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Nullable @Nullable
@Override @Override
public Double finalizeComputation(@Nullable final Object object) public Object finalizeComputation(@Nullable final Object object)
{ {
if (object == null) { if (object == null) {
return null; return null;
} }
final HllSketch sketch = (HllSketch) object; final HllSketch sketch = (HllSketch) object;
return sketch.getEstimate(); final double estimate = sketch.getEstimate();
if (round) {
return Math.round(estimate);
} else {
return estimate;
}
} }
@Override @Override
@ -177,14 +193,14 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override @Override
public AggregatorFactory getCombiningFactory() public AggregatorFactory getCombiningFactory()
{ {
return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType()); return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType(), isRound());
} }
@Override @Override
public byte[] getCacheKey() public byte[] getCacheKey()
{ {
return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName) return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName)
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build(); .appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
} }
@Override @Override
@ -209,6 +225,9 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
if (!tgtHllType.equals(that.tgtHllType)) { if (!tgtHllType.equals(that.tgtHllType)) {
return false; return false;
} }
if (round != that.round) {
return false;
}
return true; return true;
} }
@ -222,11 +241,12 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
public String toString() public String toString()
{ {
return getClass().getSimpleName() + " {" return getClass().getSimpleName() + " {"
+ "name=" + name + " name=" + name
+ "fieldName=" + fieldName + ", fieldName=" + fieldName
+ "lgK=" + lgK + ", lgK=" + lgK
+ "tgtHllType=" + tgtHllType + ", tgtHllType=" + tgtHllType
+ "}"; + ", round=" + round
+ " }";
} }
protected abstract byte getCacheTypeId(); protected abstract byte getCacheTypeId();

View File

@ -43,9 +43,11 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@JsonProperty("name") final String name, @JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName, @JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType) @JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("round") final boolean round
)
{ {
super(name, fieldName, lgK, tgtHllType); super(name, fieldName, lgK, tgtHllType, round);
} }
@Override @Override

View File

@ -46,10 +46,11 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
@JsonProperty("name") final String name, @JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName, @JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType @JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("round") final boolean round
) )
{ {
super(name, fieldName, lgK, tgtHllType); super(name, fieldName, lgK, tgtHllType, round);
} }
@Override @Override
@ -59,10 +60,11 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other; HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other;
return new HllSketchMergeAggregatorFactory( return new HllSketchMergeAggregatorFactory(
getName(), getName(),
getName(), getName(),
Math.max(getLgK(), castedOther.getLgK()), Math.max(getLgK(), castedOther.getLgK()),
getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType() getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(),
isRound() || castedOther.isRound()
); );
} else { } else {
throw new AggregatorFactoryNotMergeableException(this, other); throw new AggregatorFactoryNotMergeableException(this, other);

View File

@ -60,6 +60,7 @@ public class HllSketchSqlAggregator implements SqlAggregator
{ {
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction(); private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL"; private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";
private static final boolean ROUND = true;
@Override @Override
public SqlAggFunction calciteFunction() public SqlAggFunction calciteFunction()
@ -134,8 +135,15 @@ public class HllSketchSqlAggregator implements SqlAggregator
final AggregatorFactory aggregatorFactory; final AggregatorFactory aggregatorFactory;
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess() && rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) { if (columnArg.isDirectColumnAccess()
aggregatorFactory = new HllSketchMergeAggregatorFactory(aggregatorName, columnArg.getDirectColumn(), logK, tgtHllType); && rowSignature.getColumnType(columnArg.getDirectColumn()) == ValueType.COMPLEX) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
logK,
tgtHllType,
ROUND
);
} else { } else {
final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName(); final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName();
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
@ -161,7 +169,8 @@ public class HllSketchSqlAggregator implements SqlAggregator
aggregatorName, aggregatorName,
dimensionSpec.getDimension(), dimensionSpec.getDimension(),
logK, logK,
tgtHllType tgtHllType,
ROUND
); );
} }

View File

@ -0,0 +1,284 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import com.yahoo.sketches.hll.HllSketch;
import com.yahoo.sketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class HllSketchAggregatorFactoryTest
{
private static final String NAME = "name";
private static final String FIELD_NAME = "fieldName";
private static final int LG_K = HllSketchAggregatorFactory.DEFAULT_LG_K;
private static final String TGT_HLL_TYPE = TgtHllType.HLL_4.name();
private static final boolean ROUND = true;
private static final double ESTIMATE = Math.PI;
private TestHllSketchAggregatorFactory target;
@Before
public void setUp()
{
target = new TestHllSketchAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND);
}
@Test
public void testIsRound()
{
Assert.assertEquals(ROUND, target.isRound());
}
@Test
public void testGetRequiredColumns()
{
List<AggregatorFactory> aggregatorFactories = target.getRequiredColumns();
Assert.assertEquals(1, aggregatorFactories.size());
HllSketchAggregatorFactory aggregatorFactory = (HllSketchAggregatorFactory) aggregatorFactories.get(0);
Assert.assertEquals(FIELD_NAME, aggregatorFactory.getName());
Assert.assertEquals(FIELD_NAME, aggregatorFactory.getFieldName());
Assert.assertEquals(LG_K, aggregatorFactory.getLgK());
Assert.assertEquals(TGT_HLL_TYPE, aggregatorFactory.getTgtHllType());
Assert.assertEquals(ROUND, aggregatorFactory.isRound());
}
@Test
public void testFinalizeComputationNull()
{
Assert.assertNull(target.finalizeComputation(null));
}
@Test
public void testFinalizeComputationRound()
{
Object actual = target.finalizeComputation(getMockSketch());
Assert.assertTrue(actual instanceof Long);
Assert.assertEquals(3L, actual);
}
private static HllSketch getMockSketch()
{
HllSketch sketch = EasyMock.mock(HllSketch.class);
EasyMock.expect(sketch.getEstimate()).andReturn(ESTIMATE);
EasyMock.replay(sketch);
return sketch;
}
@Test
public void testFinalizeComputatioNoRound()
{
TestHllSketchAggregatorFactory t = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
!ROUND
);
Object actual = t.finalizeComputation(getMockSketch());
Assert.assertTrue(actual instanceof Double);
Assert.assertEquals(ESTIMATE, actual);
}
@Test
public void testEqualsSameObject()
{
Assert.assertEquals(target, target);
}
@Test
public void testEqualsOtherNull()
{
Assert.assertNotEquals(target, null);
}
@Test
public void testEqualsOtherDiffClass()
{
Assert.assertNotEquals(target, NAME);
}
@Test
public void testEqualsOtherDiffName()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME + "-diff",
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
ROUND
);
Assert.assertNotEquals(target, other);
}
@Test
public void testEqualsOtherDiffFieldName()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME + "-diff",
LG_K,
TGT_HLL_TYPE,
ROUND
);
Assert.assertNotEquals(target, other);
}
@Test
public void testEqualsOtherDiffLgK()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME,
LG_K + 1,
TGT_HLL_TYPE,
ROUND
);
Assert.assertNotEquals(target, other);
}
@Test
public void testEqualsOtherDiffTgtHllType()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TgtHllType.HLL_8.name(),
ROUND
);
Assert.assertNotEquals(target, other);
}
@Test
public void testEqualsOtherDiffRound()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
!ROUND
);
Assert.assertNotEquals(target, other);
}
@Test
public void testEqualsOtherMatches()
{
TestHllSketchAggregatorFactory other = new TestHllSketchAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
ROUND
);
Assert.assertEquals(target, other);
}
@Test
public void testToString()
{
String string = target.toString();
List<Field> toStringFields = Arrays.stream(HllSketchAggregatorFactory.class.getDeclaredFields())
.filter(HllSketchAggregatorFactoryTest::isToStringField)
.collect(Collectors.toList());
for (Field field : toStringFields) {
String expectedToken = formatFieldForToString(field);
Assert.assertTrue("Missing \"" + expectedToken + "\"", string.contains(expectedToken));
}
}
private static boolean isToStringField(Field field)
{
int modfiers = field.getModifiers();
return Modifier.isPrivate(modfiers) && !Modifier.isStatic(modfiers) && Modifier.isFinal(modfiers);
}
private static String formatFieldForToString(Field field)
{
return " " + field.getName() + "=";
}
// Helper for testing abstract base class
private static class TestHllSketchAggregatorFactory extends HllSketchAggregatorFactory
{
private static final byte DUMMY_CACHE_TYPE_ID = 0;
private static final Aggregator DUMMY_AGGREGATOR = null;
private static final BufferAggregator DUMMY_BUFFER_AGGREGATOR = null;
private static final String DUMMY_TYPE_NAME = null;
private static final int DUMMY_SIZE = 0;
TestHllSketchAggregatorFactory(
String name,
String fieldName,
@Nullable Integer lgK,
@Nullable String tgtHllType,
boolean round
)
{
super(name, fieldName, lgK, tgtHllType, round);
}
@Override
protected byte getCacheTypeId()
{
return DUMMY_CACHE_TYPE_ID;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return DUMMY_AGGREGATOR;
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return DUMMY_BUFFER_AGGREGATOR;
}
@Override
public String getTypeName()
{
return DUMMY_TYPE_NAME;
}
@Override
public int getMaxIntermediateSize()
{
return DUMMY_SIZE;
}
}
}

View File

@ -19,6 +19,9 @@
package org.apache.druid.query.aggregation.datasketches.hll; package org.apache.druid.query.aggregation.datasketches.hll;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.Row; import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
@ -34,12 +37,17 @@ import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class HllSketchAggregatorTest public class HllSketchAggregatorTest
{ {
private static final boolean ROUND = true;
private final AggregationTestHelper helper; private final AggregationTestHelper helper;
@Rule @Rule
@ -57,7 +65,7 @@ public class HllSketchAggregatorTest
{ {
final List<Object[]> constructors = new ArrayList<>(); final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[] {config}); constructors.add(new Object[]{config});
} }
return constructors; return constructors;
} }
@ -67,39 +75,16 @@ public class HllSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
String.join("\n", buildParserJson(
"{", Arrays.asList("dim", "multiDim"),
" \"type\": \"string\",", Arrays.asList("timestamp", "dim", "multiDim", "sketch")
" \"parseSpec\": {", ),
" \"format\": \"tsv\",", buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND),
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"dim\", \"multiDim\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"sketch\"],",
" \"listDelimiter\": \",\"",
" }",
"}"),
String.join("\n",
"[",
" {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}",
"]"),
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
200, // maxRowCount 200, // maxRowCount
String.join("\n", buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND)
"{", );
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}",
" ],",
" \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]",
"}"));
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -111,39 +96,16 @@ public class HllSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
String.join("\n", buildParserJson(
"{", Collections.singletonList("dim"),
" \"type\": \"string\",", Arrays.asList("timestamp", "dim", "multiDim", "id")
" \"parseSpec\": {", ),
" \"format\": \"tsv\",", buildAggregatorJson("HLLSketchBuild", "id", !ROUND),
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"dim\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],",
" \"listDelimiter\": \",\"",
" }",
"}"),
String.join("\n",
"[",
" {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"id\"}",
"]"),
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
200, // maxRowCount 200, // maxRowCount
String.join("\n", buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND)
"{", );
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"HLLSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\"}",
" ],",
" \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]",
"}"));
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -155,36 +117,16 @@ public class HllSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
String.join("\n", buildParserJson(
"{", Arrays.asList("dim", "multiDim", "id"),
" \"type\": \"string\",", Arrays.asList("timestamp", "dim", "multiDim", "id")
" \"parseSpec\": {", ),
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"dim\", \"multiDim\", \"id\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],",
" \"listDelimiter\": \",\"",
" }",
"}"),
"[]", "[]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
200, // maxRowCount 200, // maxRowCount
String.join("\n", buildGroupByQueryJson("HLLSketchBuild", "id", !ROUND)
"{", );
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"id\"}",
" ],",
" \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]",
"}"));
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
@ -196,39 +138,149 @@ public class HllSketchAggregatorTest
{ {
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment( Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()), new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
String.join("\n", buildParserJson(
"{", Arrays.asList("dim", "multiDim", "id"),
" \"type\": \"string\",", Arrays.asList("timestamp", "dim", "multiDim", "id")
" \"parseSpec\": {", ),
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMdd\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [\"dim\", \"multiDim\", \"id\"],",
" \"dimensionExclusions\": [],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"dim\", \"multiDim\", \"id\"],",
" \"listDelimiter\": \",\"",
" }",
"}"),
"[]", "[]",
0, // minTimestamp 0, // minTimestamp
Granularities.NONE, Granularities.NONE,
200, // maxRowCount 200, // maxRowCount
String.join("\n", buildGroupByQueryJson("HLLSketchBuild", "multiDim", !ROUND)
"{", );
" \"queryType\": \"groupBy\",",
" \"dataSource\": \"test_datasource\",",
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"HLLSketchBuild\", \"name\": \"sketch\", \"fieldName\": \"multiDim\"}",
" ],",
" \"intervals\": [\"2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z\"]",
"}"));
List<Row> results = seq.toList(); List<Row> results = seq.toList();
Assert.assertEquals(1, results.size()); Assert.assertEquals(1, results.size());
Row row = results.get(0); Row row = results.get(0);
Assert.assertEquals(14, (double) row.getMetric("sketch"), 0.1); Assert.assertEquals(14, (double) row.getMetric("sketch"), 0.1);
} }
@Test
public void roundBuildSketch() throws Exception
{
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_raw.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
Arrays.asList("timestamp", "dim", "multiDim", "id")
),
"[]",
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "id", ROUND)
);
List<Row> results = seq.toList();
Assert.assertEquals(1, results.size());
Row row = results.get(0);
Assert.assertEquals(200L, (long) row.getMetric("sketch"));
}
@Test
public void roundMergeSketch() throws Exception
{
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim"),
Arrays.asList("timestamp", "dim", "multiDim", "sketch")
),
buildAggregatorJson("HLLSketchMerge", "sketch", ROUND),
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND)
);
List<Row> results = seq.toList();
Assert.assertEquals(1, results.size());
Row row = results.get(0);
Assert.assertEquals(200L, (long) row.getMetric("sketch"));
}
private static String buildParserJson(List<String> dimensions, List<String> columns)
{
Map<String, Object> timestampSpec = ImmutableMap.of(
"column", "timestamp",
"format", "yyyyMMdd"
);
Map<String, Object> dimensionsSpec = ImmutableMap.of(
"dimensions", dimensions,
"dimensionExclusions", Collections.emptyList(),
"spatialDimensions", Collections.emptyList()
);
Map<String, Object> parseSpec = ImmutableMap.of(
"format", "tsv",
"timestampSpec", timestampSpec,
"dimensionsSpec", dimensionsSpec,
"columns", columns,
"listDelimiter", ","
);
Map<String, Object> object = ImmutableMap.of(
"type", "string",
"parseSpec", parseSpec
);
return toJson(object);
}
private static String toJson(Object object)
{
final String json;
try {
ObjectMapper objectMapper = new ObjectMapper();
json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return json;
}
private static String buildAggregatorJson(
String aggregationType,
String aggregationFieldName,
boolean aggregationRound
)
{
Map<String, Object> aggregator = buildAggregatorObject(
aggregationType,
aggregationFieldName,
aggregationRound
);
return toJson(Collections.singletonList(aggregator));
}
private static Map<String, Object> buildAggregatorObject(
String aggregationType,
String aggregationFieldName,
boolean aggregationRound
)
{
return ImmutableMap.of(
"type", aggregationType,
"name", "sketch",
"fieldName", aggregationFieldName,
"round", aggregationRound
);
}
private static String buildGroupByQueryJson(
String aggregationType,
String aggregationFieldName,
boolean aggregationRound
)
{
Map<String, Object> aggregation = buildAggregatorObject(
aggregationType,
aggregationFieldName,
aggregationRound
);
Map<String, Object> object = new ImmutableMap.Builder<String, Object>()
.put("queryType", "groupBy")
.put("dataSource", "test_dataSource")
.put("granularity", "ALL")
.put("dimensions", Collections.emptyList())
.put("aggregations", Collections.singletonList(aggregation))
.put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z"))
.build();
return toJson(object);
}
} }

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import com.yahoo.sketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HllSketchMergeAggregatorFactoryTest
{
private static final String NAME = "name";
private static final String FIELD_NAME = "fieldName";
private static final int LG_K = 2;
private static final String TGT_HLL_TYPE = TgtHllType.HLL_6.name();
private static final boolean ROUND = true;
private HllSketchMergeAggregatorFactory targetRound;
private HllSketchMergeAggregatorFactory targetNoRound;
@Before
public void setUp()
{
targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND);
targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, !ROUND);
}
@Test(expected = AggregatorFactoryNotMergeableException.class)
public void testGetMergingFactoryBadName() throws Exception
{
HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory(
NAME + "-diff",
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
ROUND
);
targetRound.getMergingFactory(other);
}
@Test(expected = AggregatorFactoryNotMergeableException.class)
public void testGetMergingFactoryBadType() throws Exception
{
HllSketchBuildAggregatorFactory other = new HllSketchBuildAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
ROUND
);
targetRound.getMergingFactory(other);
}
@Test
public void testGetMergingFactoryOtherSmallerLgK() throws Exception
{
final int smallerLgK = LG_K - 1;
HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory(
NAME,
FIELD_NAME,
smallerLgK,
TGT_HLL_TYPE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
Assert.assertEquals(LG_K, result.getLgK());
}
@Test
public void testGetMergingFactoryOtherLargerLgK() throws Exception
{
final int largerLgK = LG_K + 1;
HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory(
NAME,
FIELD_NAME,
largerLgK,
TGT_HLL_TYPE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
Assert.assertEquals(largerLgK, result.getLgK());
}
@Test
public void testGetMergingFactoryOtherSmallerTgtHllType() throws Exception
{
String smallerTgtHllType = TgtHllType.HLL_4.name();
HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
smallerTgtHllType,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
Assert.assertEquals(TGT_HLL_TYPE, result.getTgtHllType());
}
@Test
public void testGetMergingFactoryOtherLargerTgtHllType() throws Exception
{
String largerTgtHllType = TgtHllType.HLL_8.name();
HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
largerTgtHllType,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
Assert.assertEquals(largerTgtHllType, result.getTgtHllType());
}
@Test
public void testGetMergingFactoryThisNoRoundOtherNoRound() throws Exception
{
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetNoRound.getMergingFactory(targetNoRound);
Assert.assertFalse(result.isRound());
}
@Test
public void testGetMergingFactoryThisNoRoundOtherRound() throws Exception
{
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetNoRound.getMergingFactory(targetRound);
Assert.assertTrue(result.isRound());
}
@Test
public void testGetMergingFactoryThisRoundOtherNoRound() throws Exception
{
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(targetNoRound);
Assert.assertTrue(result.isRound());
}
@Test
public void testGetMergingFactoryThisRoundOtherRound() throws Exception
{
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(targetRound);
Assert.assertTrue(result.isRound());
}
}

View File

@ -88,6 +88,7 @@ import java.util.Map;
public class HllSketchSqlAggregatorTest extends CalciteTestBase public class HllSketchSqlAggregatorTest extends CalciteTestBase
{ {
private static final String DATA_SOURCE = "foo"; private static final String DATA_SOURCE = "foo";
private static final boolean ROUND = true;
private static QueryRunnerFactoryConglomerate conglomerate; private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser; private static Closer resourceCloser;
@ -140,7 +141,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
"hllsketch_dim1", "hllsketch_dim1",
"dim1", "dim1",
null, null,
null null,
ROUND
) )
) )
.withRollup(false) .withRollup(false)
@ -265,14 +267,16 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
"a1", "a1",
"dim2", "dim2",
null, null,
null null,
ROUND
), ),
new FilteredAggregatorFactory( new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory( new HllSketchBuildAggregatorFactory(
"a2", "a2",
"dim2", "dim2",
null, null,
null null,
ROUND
), ),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null)) BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
), ),
@ -280,16 +284,18 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
"a3", "a3",
"v0", "v0",
null, null,
null null,
ROUND
), ),
new HllSketchBuildAggregatorFactory( new HllSketchBuildAggregatorFactory(
"a4", "a4",
"v1", "v1",
null, null,
null null,
ROUND
), ),
new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8"), new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", ROUND),
new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null) new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND)
) )
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
@ -306,7 +312,11 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " AVG(u)\n" + " AVG(u)\n"
+ "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT_DS_HLL(cnt) AS u FROM druid.foo GROUP BY 1)"; + "FROM ("
+ " SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT_DS_HLL(cnt) AS u\n"
+ " FROM druid.foo\n"
+ " GROUP BY 1\n"
+ ")";
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
@ -351,7 +361,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
"a0:a", "a0:a",
"cnt", "cnt",
null, null,
null null,
ROUND
) )
) )
) )
@ -390,4 +401,22 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify query // Verify query
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void testApproxCountDistinctHllSketchIsRounded() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT"
+ " dim2,"
+ " APPROX_COUNT_DISTINCT_DS_HLL(m1)"
+ " FROM druid.foo"
+ " GROUP BY dim2"
+ " HAVING APPROX_COUNT_DISTINCT_DS_HLL(m1) = 2";
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final int expected = NullHandling.replaceWithDefault() ? 1 : 2;
Assert.assertEquals(expected, results.size());
}
} }