Refactor HllSketchBuildAggregatorFactory (#14544)

* Refactor HllSketchBuildAggregatorFactory

The usage of ColumnProcessors and HllSketchBuildColumnProcessorFactory
made it very difficult to figure out what was going on from just looking
at the AggregatorFactory or Aggregator code.  It also didn't properly
double check that you could use UTF8 ahead of time, even though it's
entirely possible to validate it before trying to use it.  This refactor
makes keeps the general indirection that had been implemented by
the Consumer<Supplier<HllSketch>> but centralizes the decision logic and
makes it easier to understand the code.

* Test fixes

* Add test that validates the types are maintained

* Add back indirection to avoid buffer calls

* Cover floats and doubles are the same thing

* Static checks
This commit is contained in:
imply-cheddar 2023-07-11 01:57:09 +09:00 committed by GitHub
parent c3f84f9ea0
commit 66cac08a52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 307 additions and 232 deletions

View File

@ -23,25 +23,22 @@ import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.Aggregator;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
public class HllSketchBuildAggregator implements Aggregator
{
private final Consumer<Supplier<HllSketch>> processor;
private final HllSketchUpdater updater;
private HllSketch sketch;
public HllSketchBuildAggregator(
final Consumer<Supplier<HllSketch>> processor,
final HllSketchUpdater updater,
final int lgK,
final TgtHllType tgtHllType
)
{
this.processor = processor;
this.updater = updater;
this.sketch = new HllSketch(lgK, tgtHllType);
}
@ -53,7 +50,7 @@ public class HllSketchBuildAggregator implements Aggregator
@Override
public synchronized void aggregate()
{
processor.accept(() -> sketch);
updater.update(() -> sketch);
}
/*

View File

@ -30,22 +30,25 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.nio.ByteBuffer;
/**
* This aggregator factory is for building sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
@SuppressWarnings("NullableProblems")
public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
{
public static final ColumnType TYPE = ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME);
@ -80,16 +83,8 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@Override
public Aggregator factorize(final ColumnSelectorFactory columnSelectorFactory)
{
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
final Consumer<Supplier<HllSketch>> processor = ColumnProcessors.makeProcessor(
getFieldName(),
new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
columnSelectorFactory
);
return new HllSketchBuildAggregator(
processor,
formulateSketchUpdater(columnSelectorFactory),
getLgK(),
TgtHllType.valueOf(getTgtHllType())
);
@ -98,16 +93,8 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@Override
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
{
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
final Consumer<Supplier<HllSketch>> processor = ColumnProcessors.makeProcessor(
getFieldName(),
new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
columnSelectorFactory
);
return new HllSketchBuildBufferAggregator(
processor,
formulateSketchUpdater(columnSelectorFactory),
getLgK(),
TgtHllType.valueOf(getTgtHllType()),
getStringEncoding(),
@ -175,4 +162,74 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
}
}
private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory columnSelectorFactory)
{
final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(getFieldName());
validateInputs(capabilities);
HllSketchUpdater updater = null;
if (capabilities != null &&
StringEncoding.UTF8.equals(getStringEncoding()) && ValueType.STRING.equals(capabilities.getType())) {
final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector(
DefaultDimensionSpec.of(getFieldName())
);
if (selector.supportsLookupNameUtf8()) {
updater = sketch -> {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
final ByteBuffer buf = selector.lookupNameUtf8(row.get(i));
if (buf != null) {
sketch.get().update(buf);
}
}
};
}
}
if (updater == null) {
@SuppressWarnings("unchecked")
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
final ValueType type;
if (capabilities == null) {
// When ingesting data, the columnSelectorFactory returns null for column capabilities, so this doesn't
// necessarily mean that the column doesn't exist. We thus need to be prepared to accept anything in this
// case. As such, we pretend like the input is COMPLEX to get the logic to use the object-based aggregation
type = ValueType.COMPLEX;
} else {
type = capabilities.getType();
}
switch (type) {
case LONG:
updater = sketch -> {
if (!selector.isNull()) {
sketch.get().update(selector.getLong());
}
};
break;
case FLOAT:
case DOUBLE:
updater = sketch -> {
if (!selector.isNull()) {
sketch.get().update(selector.getDouble());
}
};
break;
default:
updater = sketch -> {
Object obj = selector.getObject();
if (obj != null) {
HllSketchBuildUtil.updateSketch(sketch.get(), getStringEncoding(), obj);
}
};
}
}
return updater;
}
}

View File

@ -19,35 +19,33 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
@SuppressWarnings("NullableProblems")
public class HllSketchBuildBufferAggregator implements BufferAggregator
{
private final Consumer<Supplier<HllSketch>> processor;
private final HllSketchUpdater updater;
private final HllSketchBuildBufferAggregatorHelper helper;
private final StringEncoding stringEncoding;
public HllSketchBuildBufferAggregator(
final Consumer<Supplier<HllSketch>> processor,
final HllSketchUpdater updater,
final int lgK,
final TgtHllType tgtHllType,
final StringEncoding stringEncoding,
final int size
)
{
this.processor = processor;
this.updater = updater;
this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
this.stringEncoding = stringEncoding;
}
@ -61,7 +59,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
processor.accept(() -> helper.getSketchAtPosition(buf, position));
updater.update(() -> helper.getSketchAtPosition(buf, position));
}
@Override
@ -101,7 +99,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("processor", processor);
inspector.visit("processor", updater);
// lgK should be inspected because different execution paths exist in HllSketch.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028

View File

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnProcessorFactory;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* Scalar (non-vectorized) column processor factory.
*/
public class HllSketchBuildColumnProcessorFactory implements ColumnProcessorFactory<Consumer<Supplier<HllSketch>>>
{
private final StringEncoding stringEncoding;
HllSketchBuildColumnProcessorFactory(StringEncoding stringEncoding)
{
this.stringEncoding = stringEncoding;
}
@Override
public ColumnType defaultType()
{
return ColumnType.STRING;
}
@Override
public Consumer<Supplier<HllSketch>> makeDimensionProcessor(DimensionSelector selector, boolean multiValue)
{
return sketch -> {
final IndexedInts row = selector.getRow();
final int sz = row.size();
for (int i = 0; i < sz; i++) {
HllSketchBuildUtil.updateSketchWithDictionarySelector(sketch.get(), stringEncoding, selector, row.get(i));
}
};
}
@Override
public Consumer<Supplier<HllSketch>> makeFloatProcessor(BaseFloatColumnValueSelector selector)
{
return sketch -> {
if (!selector.isNull()) {
// Important that this is *double* typed, since HllSketchBuildAggregator treats doubles and floats the same.
final double value = selector.getFloat();
sketch.get().update(value);
}
};
}
@Override
public Consumer<Supplier<HllSketch>> makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
{
return sketch -> {
if (!selector.isNull()) {
sketch.get().update(selector.getDouble());
}
};
}
@Override
public Consumer<Supplier<HllSketch>> makeLongProcessor(BaseLongColumnValueSelector selector)
{
return sketch -> {
if (!selector.isNull()) {
sketch.get().update(selector.getLong());
}
};
}
@Override
public Consumer<Supplier<HllSketch>> makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
{
return sketch -> {
final Object o = selector.getObject();
if (o != null) {
HllSketchBuildUtil.updateSketch(sketch.get(), stringEncoding, o);
}
};
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import java.util.function.Supplier;
public interface HllSketchUpdater
{
void update(Supplier<HllSketch> sketch);
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
@ -46,6 +47,7 @@ import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchUnionPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
@ -109,26 +111,36 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"[2.000000004967054,2.0,2.000099863468538]",
"\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"",
2L,
"### HLL SKETCH SUMMARY: \n"
+ " Log Config K : 12\n"
+ " Hll Target : HLL_4\n"
+ " Current Mode : LIST\n"
+ " Memory : false\n"
+ " LB : 2.0\n"
+ " Estimate : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OutOfOrder Flag: false\n"
+ " Coupon Count : 2\n",
"### HLL SKETCH SUMMARY: \n"
+ " LOG CONFIG K : 12\n"
+ " HLL TARGET : HLL_4\n"
+ " CURRENT MODE : LIST\n"
+ " MEMORY : FALSE\n"
+ " LB : 2.0\n"
+ " ESTIMATE : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OUTOFORDER FLAG: FALSE\n"
+ " COUPON COUNT : 2\n",
Joiner.on("\n").join(
new Object[]{
"### HLL SKETCH SUMMARY: ",
" Log Config K : 12",
" Hll Target : HLL_4",
" Current Mode : LIST",
" Memory : false",
" LB : 2.0",
" Estimate : 2.000000004967054",
" UB : 2.000099863468538",
" OutOfOrder Flag: false",
" Coupon Count : 2",
""
}
),
Joiner.on("\n").join(
new Object[]{
"### HLL SKETCH SUMMARY: ",
" LOG CONFIG K : 12",
" HLL TARGET : HLL_4",
" CURRENT MODE : LIST",
" MEMORY : FALSE",
" LB : 2.0",
" ESTIMATE : 2.000000004967054",
" UB : 2.000099863468538",
" OUTOFORDER FLAG: FALSE",
" COUPON COUNT : 2",
""
}
),
2.0,
2L
};
@ -242,7 +254,8 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
) throws IOException
{
HllSketchModule.registerSerde();
final QueryableIndex index = IndexBuilder.create()
final QueryableIndex index = IndexBuilder
.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
@ -250,32 +263,19 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HllSketchBuildAggregatorFactory(
"hllsketch_dim1",
"dim1",
null,
null,
null,
false,
ROUND
),
new HllSketchBuildAggregatorFactory(
"hllsketch_dim3",
"dim3",
null,
null,
null,
false,
false
)
new HllSketchBuildAggregatorFactory("hllsketch_dim1", "dim1", null, null, null, false, ROUND),
new HllSketchBuildAggregatorFactory("hllsketch_dim3", "dim3", null, null, null, false, false),
new HllSketchBuildAggregatorFactory("hllsketch_m1", "m1", null, null, null, false, ROUND),
new HllSketchBuildAggregatorFactory("hllsketch_f1", "f1", null, null, null, false, ROUND),
new HllSketchBuildAggregatorFactory("hllsketch_l1", "l1", null, null, null, false, ROUND),
new HllSketchBuildAggregatorFactory("hllsketch_d1", "d1", null, null, null, false, ROUND)
)
.withRollup(false)
.build()
)
.rows(TestDataBuilder.ROWS1)
.rows(TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS)
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
@ -473,7 +473,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDataSource(CalciteTests.DATASOURCE1)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "_d0")))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
aggregators(
@ -518,7 +518,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setInterval(new MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
.setDimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG))
.setDimensions(new DefaultDimensionSpec("cnt", "_d0", ColumnType.LONG))
.setAggregatorSpecs(EXPECTED_FILTERED_AGGREGATORS)
.setPostAggregatorSpecs(EXPECTED_FILTERED_POST_AGGREGATORS)
.setContext(QUERY_CONTEXT_DEFAULT)
@ -613,7 +613,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setInterval(new MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
.setDimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG))
.setDimensions(new DefaultDimensionSpec("cnt", "_d0", ColumnType.LONG))
.setAggregatorSpecs(EXPECTED_PA_AGGREGATORS)
.setPostAggregatorSpecs(EXPECTED_PA_POST_AGGREGATORS)
.setContext(QUERY_CONTEXT_DEFAULT)
@ -898,7 +898,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
@ -957,7 +957,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
@ -982,30 +982,33 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
{
testQuery(
"SELECT"
+ " HLL_SKETCH_ESTIMATE(hllsketch_dim1)"
+ " HLL_SKETCH_ESTIMATE(hllsketch_dim1),"
+ " HLL_SKETCH_ESTIMATE(hllsketch_d1),"
+ " HLL_SKETCH_ESTIMATE(hllsketch_l1),"
+ " HLL_SKETCH_ESTIMATE(hllsketch_f1)"
+ " FROM druid.foo",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(new ExpressionVirtualColumn(
"v0",
"hll_sketch_estimate(\"hllsketch_dim1\")",
ColumnType.DOUBLE,
MACRO_TABLE
))
.virtualColumns(
makeSketchEstimateExpression("v0", "hllsketch_dim1"),
makeSketchEstimateExpression("v1", "hllsketch_d1"),
makeSketchEstimateExpression("v2", "hllsketch_l1"),
makeSketchEstimateExpression("v3", "hllsketch_f1")
)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.columns("v0")
.columns("v0", "v1", "v2", "v3")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{0.0D},
new Object[]{1.0D},
new Object[]{1.0D},
new Object[]{1.0D},
new Object[]{1.0D},
new Object[]{1.0D}
new Object[]{0.0D, 1.0D, 1.0D, 1.0D},
new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
new Object[]{1.0D, 0.0D, 0.0D, 0.0D}
)
);
}
@ -1097,14 +1100,9 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDataSource(CalciteTests.DATASOURCE1)
.setGranularity(Granularities.ALL)
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
"hll_sketch_estimate(\"hllsketch_dim1\")",
ColumnType.DOUBLE,
MACRO_TABLE
))
.setVirtualColumns(makeSketchEstimateExpression("v0", "hllsketch_dim1"))
.setDimensions(
new DefaultDimensionSpec("v0", "d0", ColumnType.DOUBLE))
new DefaultDimensionSpec("v0", "_d0", ColumnType.DOUBLE))
.setAggregatorSpecs(
aggregators(
new CountAggregatorFactory("a0")
@ -1146,13 +1144,8 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("v0", "d0", ColumnType.DOUBLE))
.virtualColumns(new ExpressionVirtualColumn(
"v0",
"hll_sketch_estimate(\"hllsketch_dim1\")",
ColumnType.DOUBLE,
MACRO_TABLE
))
.dimension(new DefaultDimensionSpec("v0", "_d0", ColumnType.DOUBLE))
.virtualColumns(makeSketchEstimateExpression("v0", "hllsketch_dim1"))
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
.threshold(2)
.aggregators(new CountAggregatorFactory("a0"))
@ -1165,4 +1158,115 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
)
);
}
/**
* This is an extremely subtle test, so we explain with a comment. The `m1` column in the input data looks like
* `["1.0", "2.0", "3.0", "4.0", "5.0", "6.0"]` while the `d1` column looks like
* `[1.0, 1.7, 0.0]`. That is, "m1" is numbers-as-strings, while d1 is numbers-as-numbers. If you take the
* uniques across both columns, you expect no overlap, so 9 entries. However, if the `1.0` from `d1` gets
* converted into `"1.0"` or vice-versa, the result can become 8 because then the sketch will hash the same
* value multiple times considering them duplicates. This test validates that the aggregator properly builds
* the sketches preserving the initial type of the data as it came in. Specifically, the test was added when
* a code change caused the 1.0 to get converted to a String such that the resulting value of the query was 8
* instead of 9.
*/
@Test
public void testEstimateStringAndDoubleAreDifferent()
{
testQuery(
"SELECT"
+ " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1), DS_HLL(hllsketch_m1)), true)"
+ " FROM druid.foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
new HllSketchMergeAggregatorFactory("a0", "hllsketch_d1", null, null, null, false, true),
new HllSketchMergeAggregatorFactory("a1", "hllsketch_m1", null, null, null, false, true)
)
.postAggregators(
new HllSketchToEstimatePostAggregator(
"p3",
new HllSketchUnionPostAggregator(
"p2",
Arrays.asList(
new FieldAccessPostAggregator("p0", "a0"),
new FieldAccessPostAggregator("p1", "a1")
),
null,
null
),
true
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{9.0D}
)
);
}
/**
* This is a test in a similar vein to {@link #testEstimateStringAndDoubleAreDifferent()} except here we are
* ensuring that float values and doubles values are considered equivalent. The expected initial inputs were
* <p>
* 1. d1 -> [1.0, 1.7, 0.0]
* 2. f1 -> [1.0f, 0.1f, 0.0f]
* <p>
* If we assume that doubles and floats are the same, that means that there are 4 unique values, not 6
*/
@Test
public void testFloatAndDoubleAreConsideredTheSame()
{
// This is a test in a similar vein to testEstimateStringAndDoubleAreDifferent above
testQuery(
"SELECT"
+ " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1), DS_HLL(hllsketch_f1)), true)"
+ " FROM druid.foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
new HllSketchMergeAggregatorFactory("a0", "hllsketch_d1", null, null, null, false, true),
new HllSketchMergeAggregatorFactory("a1", "hllsketch_f1", null, null, null, false, true)
)
.postAggregators(
new HllSketchToEstimatePostAggregator(
"p3",
new HllSketchUnionPostAggregator(
"p2",
Arrays.asList(
new FieldAccessPostAggregator("p0", "a0"),
new FieldAccessPostAggregator("p1", "a1")
),
null,
null
),
true
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{4.0D}
)
);
}
private ExpressionVirtualColumn makeSketchEstimateExpression(String outputName, String field)
{
return new ExpressionVirtualColumn(
outputName,
StringUtils.format("hll_sketch_estimate(\"%s\")", field),
ColumnType.DOUBLE,
MACRO_TABLE
);
}
}