mirror of https://github.com/apache/druid.git
Fix vectorized cardinality bug on certain string columns. (#11199)
* Fix vectorized cardinality bug on certain string columns. Fixes a bug introduced in #11182, related to the fact that in some cases, ColumnProcessors.makeVectorProcessor will call "makeObjectProcessor" instead of "makeSingleValueDimensionProcessor" or "makeMultiValueDimensionProcessor". CardinalityVectorProcessorFactory improperly ignored calls to "makeObjectProcessor". In addition to fixing the bug, I added this detail to the javadocs for VectorColumnProcessorFactory, to prevent others from running into the same thing in the future. They do not currently call out this case. * Improve test coverage. * Additional fixes.
This commit is contained in:
parent
a8c00d8d9b
commit
a1f850d707
|
@ -69,6 +69,7 @@ public class CardinalityVectorProcessorFactory implements VectorColumnProcessorF
|
|||
@Override
|
||||
public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
|
||||
{
|
||||
return NilCardinalityVectorProcessor.INSTANCE;
|
||||
// Handles string-as-object and complex types.
|
||||
return new StringObjectCardinalityVectorProcessor(selector);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor();
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
ByteBuffer buf,
|
||||
int numRows,
|
||||
int[] positions,
|
||||
@Nullable int[] rows,
|
||||
int positionOffset
|
||||
)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.cardinality.vector;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class StringObjectCardinalityVectorProcessor implements CardinalityVectorProcessor
|
||||
{
|
||||
private final VectorObjectSelector selector;
|
||||
|
||||
public StringObjectCardinalityVectorProcessor(final VectorObjectSelector selector)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final Object[] vector = selector.getObjectVector();
|
||||
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
addObjectIfString(collector, vector[i]);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
// Save position, limit and restore later instead of allocating a new ByteBuffer object
|
||||
final int oldPosition = buf.position();
|
||||
final int oldLimit = buf.limit();
|
||||
|
||||
try {
|
||||
final Object[] vector = selector.getObjectVector();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final Object obj = vector[rows != null ? rows[i] : i];
|
||||
|
||||
if (NullHandling.replaceWithDefault() || obj != null) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
|
||||
buf.position(position);
|
||||
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
|
||||
addObjectIfString(collector, obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
buf.limit(oldLimit);
|
||||
buf.position(oldPosition);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an Object to a HyperLogLogCollector. If the object is a {@code List<String>} or {@code String} then
|
||||
* the individual Strings are added to the collector.
|
||||
*
|
||||
* If the object is any other type (including null) then behavior depends on null-handling mode:
|
||||
*
|
||||
* - In SQL-compatible mode, ignore non-strings and nulls.
|
||||
* - In replace-with-default mode, treat all non-strings and nulls as empty strings.
|
||||
*/
|
||||
private static void addObjectIfString(final HyperLogLogCollector collector, @Nullable final Object obj)
|
||||
{
|
||||
if (obj instanceof String) {
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, (String) obj);
|
||||
} else if (obj instanceof List) {
|
||||
//noinspection unchecked
|
||||
for (String s : (List<String>) obj) {
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s);
|
||||
}
|
||||
} else {
|
||||
StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, null);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,8 +41,12 @@ import org.apache.druid.segment.vector.VectorValueSelector;
|
|||
public interface VectorColumnProcessorFactory<T>
|
||||
{
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value
|
||||
* Called only if {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value
|
||||
* per row.
|
||||
*
|
||||
* Note that for STRING-typed columns where the dictionary does not exist or is not expected to be useful,
|
||||
* {@link #makeObjectProcessor} may be called instead. To handle all string inputs properly, processors must implement
|
||||
* all three methods (single-value, multi-value, object).
|
||||
*/
|
||||
T makeSingleValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
|
@ -50,8 +54,12 @@ public interface VectorColumnProcessorFactory<T>
|
|||
);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values
|
||||
* Called only if {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values
|
||||
* per row.
|
||||
*
|
||||
* Note that for STRING-typed columns where the dictionary does not exist or is not expected to be useful,
|
||||
* {@link #makeObjectProcessor} may be called instead. To handle all string inputs properly, processors must implement
|
||||
* all three methods (single-value, multi-value, object).
|
||||
*/
|
||||
T makeMultiValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
|
@ -74,7 +82,8 @@ public interface VectorColumnProcessorFactory<T>
|
|||
T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is COMPLEX.
|
||||
* Called when {@link ColumnCapabilities#getType()} is COMPLEX. May also be called for STRING typed columns in
|
||||
* cases where the dictionary does not exist or is not expected to be useful.
|
||||
*/
|
||||
T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector);
|
||||
}
|
||||
|
|
|
@ -8855,6 +8855,78 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByCardinalityAggOnMultiStringExpression()
|
||||
{
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.setVirtualColumns(
|
||||
new ExpressionVirtualColumn("v0", "concat(quality,market)", ValueType.STRING, TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
QueryRunnerTestHelper.ROWS_COUNT,
|
||||
new CardinalityAggregatorFactory(
|
||||
"numVals",
|
||||
ImmutableList.of(DefaultDimensionSpec.of("v0")),
|
||||
false
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
List<ResultRow> expectedResults = Collections.singletonList(
|
||||
makeRow(
|
||||
query,
|
||||
"2011-04-01",
|
||||
"rows",
|
||||
26L,
|
||||
"numVals",
|
||||
13.041435202975777d
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByCardinalityAggOnHyperUnique()
|
||||
{
|
||||
// Cardinality aggregator on complex columns (like hyperUnique) returns 0.
|
||||
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.setAggregatorSpecs(
|
||||
QueryRunnerTestHelper.ROWS_COUNT,
|
||||
new CardinalityAggregatorFactory(
|
||||
"cardinality",
|
||||
ImmutableList.of(DefaultDimensionSpec.of("quality_uniques")),
|
||||
false
|
||||
),
|
||||
new HyperUniquesAggregatorFactory("hyperUnique", "quality_uniques", false, false)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
List<ResultRow> expectedResults = Collections.singletonList(
|
||||
makeRow(
|
||||
query,
|
||||
"2011-04-01",
|
||||
"rows",
|
||||
26L,
|
||||
"cardinality",
|
||||
NullHandling.replaceWithDefault() ? 1.0002442201269182 : 0.0d,
|
||||
"hyperUnique",
|
||||
9.019833517963864d
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByLongColumn()
|
||||
{
|
||||
|
|
|
@ -48,9 +48,12 @@ import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
|
|||
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.expression.TestExprMacroTable;
|
||||
import org.apache.druid.query.extraction.MapLookupExtractor;
|
||||
import org.apache.druid.query.filter.AndDimFilter;
|
||||
|
@ -3035,6 +3038,84 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
|
|||
assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesCardinalityAggOnMultiStringExpression()
|
||||
{
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.virtualColumns(
|
||||
new ExpressionVirtualColumn("v0", "concat(quality,market)", ValueType.STRING, TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
.aggregators(
|
||||
QueryRunnerTestHelper.ROWS_COUNT,
|
||||
new CardinalityAggregatorFactory(
|
||||
"numVals",
|
||||
ImmutableList.of(DefaultDimensionSpec.of("v0")),
|
||||
false
|
||||
)
|
||||
)
|
||||
.granularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.of(
|
||||
"rows",
|
||||
26L,
|
||||
"numVals",
|
||||
13.041435202975777d
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
|
||||
assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesCardinalityAggOnHyperUnique()
|
||||
{
|
||||
// Cardinality aggregator on complex columns (like hyperUnique) returns 0.
|
||||
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.aggregators(
|
||||
QueryRunnerTestHelper.ROWS_COUNT,
|
||||
new CardinalityAggregatorFactory(
|
||||
"cardinality",
|
||||
ImmutableList.of(DefaultDimensionSpec.of("quality_uniques")),
|
||||
false
|
||||
),
|
||||
new HyperUniquesAggregatorFactory("hyperUnique", "quality_uniques", false, false)
|
||||
)
|
||||
.granularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.of(
|
||||
"rows",
|
||||
26L,
|
||||
"cardinality",
|
||||
NullHandling.replaceWithDefault() ? 1.0002442201269182 : 0.0d,
|
||||
"hyperUnique",
|
||||
9.019833517963864d
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
|
||||
assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
|
||||
private Map<String, Object> makeContext()
|
||||
{
|
||||
return makeContext(ImmutableMap.of());
|
||||
|
|
|
@ -8010,6 +8010,37 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApproxCountDistinctOnVectorizableSingleStringExpression() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT APPROX_COUNT_DISTINCT(dim1 || 'hello') FROM druid.foo",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.virtualColumns(
|
||||
expressionVirtualColumn("v0", "concat(\"dim1\",'hello')", ValueType.STRING)
|
||||
)
|
||||
.aggregators(
|
||||
aggregators(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0",
|
||||
null,
|
||||
dimensions(DefaultDimensionSpec.of("v0")),
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(new Object[]{6L})
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedGroupBy() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue