diff --git a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 270ec164086..ab5eef29085 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -36,6 +36,13 @@ import java.util.Iterator; import java.util.List; /** + * This {@link TopNAlgorithm} is tailored to processing aggregates on high cardility columns which are likely to have + * larger result sets. Internally it uses a 2 phase approach to compute the top-n result using the + * {@link PooledTopNAlgorithm} for each phase. The first phase is to process the segment with only the order-by + * aggregator to compute which values constitute the top 'n' results. With this information, a actual result set + * is computed by a second run of the {@link PooledTopNAlgorithm}, this time with all aggregators, but only considering + * the values from the 'n' results to avoid performing any aggregations that would have been thrown away for results + * that didn't make the top-n. */ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm { diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index 7cd93401b1c..19ebb543578 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.FilteredOffset; import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.Offset; import org.apache.druid.segment.historical.HistoricalColumnSelector; @@ -49,7 +50,18 @@ import java.util.Arrays; import java.util.List; /** + * This {@link TopNAlgorithm} is highly specialized for processing aggregates on string columns that are + * {@link ColumnCapabilities#isDictionaryEncoded()} and {@link ColumnCapabilities#areDictionaryValuesUnique()}. This + * algorithm is built around using a direct {@link ByteBuffer} from the 'processing pool' of intermediary results + * buffers, to aggregate using the dictionary id directly as the key, to defer looking up the value until is necessary. * + * At runtime, this implementation is specialized with wizardry to optimize for processing common top-n query shapes, + * see {@link #computeSpecializedScanAndAggregateImplementations}, + * {@link Generic1AggPooledTopNScanner} and {@link Generic1AggPooledTopNScannerPrototype}, + * {@link Generic2AggPooledTopNScanner} and {@link Generic2AggPooledTopNScannerPrototype}, + * {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop}, + * {@link org.apache.druid.query.monomorphicprocessing.HotLoopCallee}, + * {@link org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector} for more details. */ public class PooledTopNAlgorithm extends BaseTopNAlgorithm diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index f1eab7fe268..eb22dc9b4ae 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -53,6 +53,12 @@ public class TopNQueryEngine this.bufferPool = bufferPool; } + /** + * Do the thing - process a {@link StorageAdapter} into a {@link Sequence} of {@link TopNResultValue}, with one of the + * fine {@link TopNAlgorithm} available chosen based on the type of column being aggregated. The algorithm provides a + * mapping function to process rows from the adapter {@link org.apache.druid.segment.Cursor} to apply + * {@link AggregatorFactory} and create or update {@link TopNResultValue} + */ public Sequence> query( final TopNQuery query, final StorageAdapter adapter, @@ -71,7 +77,9 @@ public class TopNQueryEngine final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics); Preconditions.checkArgument( - queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals + queryIntervals.size() == 1, + "Can only handle a single interval, got[%s]", + queryIntervals ); return Sequences.filter( @@ -95,6 +103,9 @@ public class TopNQueryEngine ); } + /** + * Choose the best {@link TopNAlgorithm} for the given query. + */ private TopNMapFn getMapFn( final TopNQuery query, final StorageAdapter adapter, @@ -120,22 +131,8 @@ public class TopNQueryEngine final TopNAlgorithm topNAlgorithm; - if (requiresHeapAlgorithm(selector, query, columnCapabilities)) { - // heap based algorithm selection - if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) { - // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long. - // We might be able to use this for any long column with an extraction function, that is - // ValueType.LONG.equals(columnCapabilities.getType()) - // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm - - // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm - // currently relies on the dimension cardinality to support lexicographic sorting - topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); - } else { - topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } - } else { - // pool based algorithm selection + if (canUsePooledAlgorithm(selector, query, columnCapabilities)) { + // pool based algorithm selection, if we can if (selector.isAggregateAllMetrics()) { // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for // this @@ -148,6 +145,20 @@ public class TopNQueryEngine // anything else, use the regular pooled algorithm topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); } + } else { + // heap based algorithm selection, if we must + if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) { + // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long. + // We might be able to use this for any long column with an extraction function, that is + // ValueType.LONG.equals(columnCapabilities.getType()) + // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm + + // A special TimeExtractionTopNAlgorithm is required since HeapBasedTopNAlgorithm + // currently relies on the dimension cardinality to support lexicographic sorting + topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); + } else { + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); + } } if (queryMetrics != null) { queryMetrics.algorithm(topNAlgorithm); @@ -166,7 +177,7 @@ public class TopNQueryEngine * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of * selectors. */ - private static boolean requiresHeapAlgorithm( + private static boolean canUsePooledAlgorithm( final TopNAlgorithmSelector selector, final TopNQuery query, final ColumnCapabilities capabilities @@ -174,22 +185,30 @@ public class TopNQueryEngine { if (selector.isHasExtractionFn()) { // extraction functions can have a many to one mapping, and should use a heap algorithm - return true; + return false; } if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { // non-string output cannot use the pooled algorith, even if the underlying selector supports it - return true; + return false; } if (capabilities != null && capabilities.getType() == ValueType.STRING) { // string columns must use the on heap algorithm unless they have the following capabilites - return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue()); + return capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue(); } else { // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm - return true; + return false; } } + /** + * {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the + * which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the + * query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the + * {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be + * chosen for processing segments, and then added back and evaluated against the final merged result sets on the + * broker via {@link TopNQueryQueryToolChest#postMergeQueryDecoration}. + */ public static boolean canApplyExtractionInPost(TopNQuery query) { return query.getDimensionSpec() != null