From 6b272c857f14a79eceefd1ee24957332a49abef5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 10 Jun 2021 16:32:22 -0700 Subject: [PATCH] adjust topn heap algorithm to only use known cardinality path when dictionary is unique (#11186) * adjust topn heap algorithm to only use known cardinality path when dictionary is unique * better check and add comment * adjust comment more --- .../query/topn/HeapBasedTopNAlgorithm.java | 1 - .../topn/TimeExtractionTopNAlgorithm.java | 4 -- .../druid/query/topn/TopNQueryEngine.java | 2 +- .../StringTopNColumnAggregatesProcessor.java | 25 ++++++++++-- .../TopNColumnAggregatesProcessorFactory.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 39 +++++++++++++++++++ 6 files changed, 63 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java index 87f79562fab..14f3b729e1e 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java @@ -42,7 +42,6 @@ public class HeapBasedTopNAlgorithm ) { super(storageAdapter); - this.query = query; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index cee1e3ac32e..b036ba64c95 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -87,10 +87,6 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm, Aggregator[]> aggregatesStore ) { - if (params.getCardinality() < 0) { - throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); - } - final Cursor cursor = params.getCursor(); final DimensionSelector dimSelector = params.getDimSelector(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index daad69fc7a6..ad03f6ae3a8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -202,7 +202,7 @@ public class TopNQueryEngine } /** - * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the + * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since * which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the * query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the * {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 92b2e8a3fe9..eacbde0863f 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.IndexedInts; @@ -38,18 +39,25 @@ import java.util.function.Function; public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor { + private final ColumnCapabilities capabilities; private final Function> dimensionValueConverter; private HashMap, Aggregator[]> aggregatesStore; - public StringTopNColumnAggregatesProcessor(final ValueType dimensionType) + public StringTopNColumnAggregatesProcessor(final ColumnCapabilities capabilities, final ValueType dimensionType) { + this.capabilities = capabilities; this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType); } @Override public int getCardinality(DimensionSelector selector) { - return selector.getValueCardinality(); + // only report the underlying selector cardinality if the column the selector is for is dictionary encoded, and + // the dictionary values are unique, that is they have a 1:1 mapping between dictionaryId and column value + if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) { + return selector.getValueCardinality(); + } + return DimensionDictionarySelector.CARDINALITY_UNKNOWN; } @Override @@ -108,7 +116,18 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates Aggregator[][] rowSelector ) { - if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) { + final boolean notUnknown = selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN; + final boolean unique = capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue(); + // we must know cardinality to use array based aggregation + // we check for uniquely dictionary encoded values because non-unique (meaning dictionary ids do not have a 1:1 + // relation with values) negates many of the benefits of array aggregation: + // - if different dictionary ids map to the same value but dictionary ids are unique to that value (*:1), then + // array aggregation will be correct but will still have to potentially perform many map lookups and lose the + // performance benefit array aggregation is trying to provide + // - in cases where the same dictionary ids map to different values (1:* or *:*), results can be entirely + // incorrect since an aggregator for a different value might be chosen from the array based on the re-used + // dictionary id + if (notUnknown && unique) { return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector); } else { return scanAndAggregateWithCardinalityUnknown(query, cursor, selector); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java index 56a29433fcb..be6eb08f891 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java @@ -48,7 +48,7 @@ public class TopNColumnAggregatesProcessorFactory final ValueType selectorType = capabilities.getType(); if (selectorType.equals(ValueType.STRING)) { - return new StringTopNColumnAggregatesProcessor(dimensionType); + return new StringTopNColumnAggregatesProcessor(capabilities, dimensionType); } else if (selectorType.isNumeric()) { final Function> converter; final ValueType strategyType; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 1e32e765ab5..f724cd0797e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -16928,6 +16928,45 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map queryContext) throws Exception + { + testQuery( + "SELECT druid.broadcast.dim4, COUNT(*)\n" + + "FROM druid.numfoo\n" + + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n" + + "GROUP BY 1 ORDER BY 1 DESC LIMIT 4", + queryContext, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE3), + new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE), + "j0.", + equalsCondition( + DruidExpression.fromColumn("dim4"), + DruidExpression.fromColumn("j0.dim4") + ), + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING)) + .threshold(4) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .context(queryContext) + .metric(new InvertedTopNMetricSpec(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC))) + .build() + ), + ImmutableList.of( + new Object[]{"b", 9L}, + new Object[]{"a", 9L} + ) + ); + } + @Test public void testTimeStampAddZeroDayPeriod() throws Exception {