mirror of https://github.com/apache/druid.git
adjust topn heap algorithm to only use known cardinality path when dictionary is unique (#11186)
* adjust topn heap algorithm to only use known cardinality path when dictionary is unique * better check and add comment * adjust comment more
This commit is contained in:
parent
51f983101e
commit
6b272c857f
|
@ -42,7 +42,6 @@ public class HeapBasedTopNAlgorithm
|
|||
)
|
||||
{
|
||||
super(storageAdapter);
|
||||
|
||||
this.query = query;
|
||||
}
|
||||
|
||||
|
|
|
@ -87,10 +87,6 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
|
|||
Map<Comparable<?>, Aggregator[]> aggregatesStore
|
||||
)
|
||||
{
|
||||
if (params.getCardinality() < 0) {
|
||||
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
|
||||
}
|
||||
|
||||
final Cursor cursor = params.getCursor();
|
||||
final DimensionSelector dimSelector = params.getDimSelector();
|
||||
|
||||
|
|
|
@ -202,7 +202,7 @@ public class TopNQueryEngine
|
|||
}
|
||||
|
||||
/**
|
||||
* {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
|
||||
* {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since
|
||||
* which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
|
||||
* query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
|
||||
* {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.druid.segment.DimensionDictionarySelector;
|
|||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
|
||||
|
@ -38,18 +39,25 @@ import java.util.function.Function;
|
|||
|
||||
public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
|
||||
{
|
||||
private final ColumnCapabilities capabilities;
|
||||
private final Function<Object, Comparable<?>> dimensionValueConverter;
|
||||
private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
|
||||
|
||||
public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
|
||||
public StringTopNColumnAggregatesProcessor(final ColumnCapabilities capabilities, final ValueType dimensionType)
|
||||
{
|
||||
this.capabilities = capabilities;
|
||||
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCardinality(DimensionSelector selector)
|
||||
{
|
||||
return selector.getValueCardinality();
|
||||
// only report the underlying selector cardinality if the column the selector is for is dictionary encoded, and
|
||||
// the dictionary values are unique, that is they have a 1:1 mapping between dictionaryId and column value
|
||||
if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) {
|
||||
return selector.getValueCardinality();
|
||||
}
|
||||
return DimensionDictionarySelector.CARDINALITY_UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,7 +116,18 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates
|
|||
Aggregator[][] rowSelector
|
||||
)
|
||||
{
|
||||
if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
|
||||
final boolean notUnknown = selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN;
|
||||
final boolean unique = capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue();
|
||||
// we must know cardinality to use array based aggregation
|
||||
// we check for uniquely dictionary encoded values because non-unique (meaning dictionary ids do not have a 1:1
|
||||
// relation with values) negates many of the benefits of array aggregation:
|
||||
// - if different dictionary ids map to the same value but dictionary ids are unique to that value (*:1), then
|
||||
// array aggregation will be correct but will still have to potentially perform many map lookups and lose the
|
||||
// performance benefit array aggregation is trying to provide
|
||||
// - in cases where the same dictionary ids map to different values (1:* or *:*), results can be entirely
|
||||
// incorrect since an aggregator for a different value might be chosen from the array based on the re-used
|
||||
// dictionary id
|
||||
if (notUnknown && unique) {
|
||||
return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
|
||||
} else {
|
||||
return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
|
||||
|
|
|
@ -48,7 +48,7 @@ public class TopNColumnAggregatesProcessorFactory
|
|||
final ValueType selectorType = capabilities.getType();
|
||||
|
||||
if (selectorType.equals(ValueType.STRING)) {
|
||||
return new StringTopNColumnAggregatesProcessor(dimensionType);
|
||||
return new StringTopNColumnAggregatesProcessor(capabilities, dimensionType);
|
||||
} else if (selectorType.isNumeric()) {
|
||||
final Function<Object, Comparable<?>> converter;
|
||||
final ValueType strategyType;
|
||||
|
|
|
@ -16928,6 +16928,45 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters(source = QueryContextForJoinProvider.class)
|
||||
public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map<String, Object> queryContext) throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT druid.broadcast.dim4, COUNT(*)\n"
|
||||
+ "FROM druid.numfoo\n"
|
||||
+ "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
|
||||
+ "GROUP BY 1 ORDER BY 1 DESC LIMIT 4",
|
||||
queryContext,
|
||||
ImmutableList.of(
|
||||
new TopNQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE3),
|
||||
new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("dim4"),
|
||||
DruidExpression.fromColumn("j0.dim4")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
|
||||
.threshold(4)
|
||||
.aggregators(aggregators(new CountAggregatorFactory("a0")))
|
||||
.context(queryContext)
|
||||
.metric(new InvertedTopNMetricSpec(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)))
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"b", 9L},
|
||||
new Object[]{"a", 9L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeStampAddZeroDayPeriod() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue