input type validation for datasketches hll "build" aggregator factory (#12131)

* Ingestion will fail for HLLSketchBuild instead of creating with incorrect values

* Addressing review comments for HLL< updated error message introduced test case
This commit is contained in:
somu-imply 2022-01-11 12:00:14 -08:00 committed by GitHub
parent eb0bae49ec
commit 08fea7a46a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 0 deletions

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
@ -30,7 +31,9 @@ import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -55,6 +58,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
super(name, fieldName, lgK, tgtHllType, round); super(name, fieldName, lgK, tgtHllType, round);
} }
@Override @Override
public ColumnType getIntermediateType() public ColumnType getIntermediateType()
{ {
@ -71,6 +75,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
public Aggregator factorize(final ColumnSelectorFactory columnSelectorFactory) public Aggregator factorize(final ColumnSelectorFactory columnSelectorFactory)
{ {
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
return new HllSketchBuildAggregator(selector, getLgK(), TgtHllType.valueOf(getTgtHllType())); return new HllSketchBuildAggregator(selector, getLgK(), TgtHllType.valueOf(getTgtHllType()));
} }
@ -78,6 +83,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory) public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory)
{ {
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
return new HllSketchBuildBufferAggregator( return new HllSketchBuildBufferAggregator(
selector, selector,
getLgK(), getLgK(),
@ -95,6 +101,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@Override @Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{ {
validateInputs(selectorFactory.getColumnCapabilities(getFieldName()));
return new HllSketchBuildVectorAggregator( return new HllSketchBuildVectorAggregator(
selectorFactory, selectorFactory,
getFieldName(), getFieldName(),
@ -114,4 +121,19 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
return HllSketch.getMaxUpdatableSerializationBytes(getLgK(), TgtHllType.valueOf(getTgtHllType())); return HllSketch.getMaxUpdatableSerializationBytes(getLgK(), TgtHllType.valueOf(getTgtHllType()));
} }
private void validateInputs(@Nullable ColumnCapabilities capabilities)
{
if (capabilities != null) {
if (capabilities.is(ValueType.COMPLEX)) {
throw new ISE(
"Invalid input [%s] of type [%s] for [%s] aggregator [%s]",
getFieldName(),
capabilities.asTypeString(),
HllSketchModule.BUILD_TYPE_NAME,
getName()
);
}
}
}
} }

View File

@ -143,6 +143,35 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
Assert.assertEquals(200, (double) row.get(0), 0.1); Assert.assertEquals(200, (double) row.get(0), 0.1);
} }
@Test
public void unsuccessfulComplexTypesInHLL() throws Exception
{
String metricSpec = "[{"
+ "\"type\": \"hyperUnique\","
+ "\"name\": \"index_hll\","
+ "\"fieldName\": \"id\""
+ "}]";
try {
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("hll/hll_sketches.tsv").getFile()),
buildParserJson(
Arrays.asList("dim", "multiDim", "id"),
Arrays.asList("timestamp", "dim", "multiDim", "id")
),
metricSpec,
0, // minTimestamp
Granularities.NONE,
200, // maxRowCount
buildGroupByQueryJson("HLLSketchBuild", "index_hll", !ROUND)
);
}
catch (RuntimeException e) {
Assert.assertTrue(
e.getMessage().contains("Invalid input [index_hll] of type [COMPLEX<hyperUnique>] for [HLLSketchBuild]"));
}
}
@Test @Test
public void buildSketchesAtQueryTimeMultiValue() throws Exception public void buildSketchesAtQueryTimeMultiValue() throws Exception
{ {