minor rework of topn algorithm selection for clarity and more javadocs (#10058)

* minor refactor of topn engine algorithm selection for clarity

* adjust

* more javadoc
This commit is contained in:
Clint Wylie 2020-06-22 09:08:50 -07:00 committed by GitHub
parent 5600e1c204
commit eee99ff0d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 22 deletions

View File

@ -36,6 +36,13 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
/** /**
* This {@link TopNAlgorithm} is tailored to processing aggregates on high cardility columns which are likely to have
* larger result sets. Internally it uses a 2 phase approach to compute the top-n result using the
* {@link PooledTopNAlgorithm} for each phase. The first phase is to process the segment with only the order-by
* aggregator to compute which values constitute the top 'n' results. With this information, a actual result set
* is computed by a second run of the {@link PooledTopNAlgorithm}, this time with all aggregators, but only considering
* the values from the 'n' results to avoid performing any aggregations that would have been thrown away for results
* that didn't make the top-n.
*/ */
public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams> public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams>
{ {

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.FilteredOffset; import org.apache.druid.segment.FilteredOffset;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.Offset; import org.apache.druid.segment.data.Offset;
import org.apache.druid.segment.historical.HistoricalColumnSelector; import org.apache.druid.segment.historical.HistoricalColumnSelector;
@ -49,7 +50,18 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* This {@link TopNAlgorithm} is highly specialized for processing aggregates on string columns that are
* {@link ColumnCapabilities#isDictionaryEncoded()} and {@link ColumnCapabilities#areDictionaryValuesUnique()}. This
* algorithm is built around using a direct {@link ByteBuffer} from the 'processing pool' of intermediary results
* buffers, to aggregate using the dictionary id directly as the key, to defer looking up the value until is necessary.
* *
* At runtime, this implementation is specialized with wizardry to optimize for processing common top-n query shapes,
* see {@link #computeSpecializedScanAndAggregateImplementations},
* {@link Generic1AggPooledTopNScanner} and {@link Generic1AggPooledTopNScannerPrototype},
* {@link Generic2AggPooledTopNScanner} and {@link Generic2AggPooledTopNScannerPrototype},
* {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop},
* {@link org.apache.druid.query.monomorphicprocessing.HotLoopCallee},
* {@link org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector} for more details.
*/ */
public class PooledTopNAlgorithm public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams> extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>

View File

@ -53,6 +53,12 @@ public class TopNQueryEngine
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
} }
/**
* Do the thing - process a {@link StorageAdapter} into a {@link Sequence} of {@link TopNResultValue}, with one of the
* fine {@link TopNAlgorithm} available chosen based on the type of column being aggregated. The algorithm provides a
* mapping function to process rows from the adapter {@link org.apache.druid.segment.Cursor} to apply
* {@link AggregatorFactory} and create or update {@link TopNResultValue}
*/
public Sequence<Result<TopNResultValue>> query( public Sequence<Result<TopNResultValue>> query(
final TopNQuery query, final TopNQuery query,
final StorageAdapter adapter, final StorageAdapter adapter,
@ -71,7 +77,9 @@ public class TopNQueryEngine
final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics); final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics);
Preconditions.checkArgument( Preconditions.checkArgument(
queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals queryIntervals.size() == 1,
"Can only handle a single interval, got[%s]",
queryIntervals
); );
return Sequences.filter( return Sequences.filter(
@ -95,6 +103,9 @@ public class TopNQueryEngine
); );
} }
/**
* Choose the best {@link TopNAlgorithm} for the given query.
*/
private TopNMapFn getMapFn( private TopNMapFn getMapFn(
final TopNQuery query, final TopNQuery query,
final StorageAdapter adapter, final StorageAdapter adapter,
@ -120,22 +131,8 @@ public class TopNQueryEngine
final TopNAlgorithm<?, ?> topNAlgorithm; final TopNAlgorithm<?, ?> topNAlgorithm;
if (requiresHeapAlgorithm(selector, query, columnCapabilities)) { if (canUsePooledAlgorithm(selector, query, columnCapabilities)) {
// heap based algorithm selection // pool based algorithm selection, if we can
if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
// We might be able to use this for any long column with an extraction function, that is
// ValueType.LONG.equals(columnCapabilities.getType())
// but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
// A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
}
} else {
// pool based algorithm selection
if (selector.isAggregateAllMetrics()) { if (selector.isAggregateAllMetrics()) {
// if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
// this // this
@ -148,6 +145,20 @@ public class TopNQueryEngine
// anything else, use the regular pooled algorithm // anything else, use the regular pooled algorithm
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} }
} else {
// heap based algorithm selection, if we must
if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
// We might be able to use this for any long column with an extraction function, that is
// ValueType.LONG.equals(columnCapabilities.getType())
// but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm
// A special TimeExtractionTopNAlgorithm is required since HeapBasedTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
}
} }
if (queryMetrics != null) { if (queryMetrics != null) {
queryMetrics.algorithm(topNAlgorithm); queryMetrics.algorithm(topNAlgorithm);
@ -166,7 +177,7 @@ public class TopNQueryEngine
* (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
* selectors. * selectors.
*/ */
private static boolean requiresHeapAlgorithm( private static boolean canUsePooledAlgorithm(
final TopNAlgorithmSelector selector, final TopNAlgorithmSelector selector,
final TopNQuery query, final TopNQuery query,
final ColumnCapabilities capabilities final ColumnCapabilities capabilities
@ -174,22 +185,30 @@ public class TopNQueryEngine
{ {
if (selector.isHasExtractionFn()) { if (selector.isHasExtractionFn()) {
// extraction functions can have a many to one mapping, and should use a heap algorithm // extraction functions can have a many to one mapping, and should use a heap algorithm
return true; return false;
} }
if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// non-string output cannot use the pooled algorith, even if the underlying selector supports it // non-string output cannot use the pooled algorith, even if the underlying selector supports it
return true; return false;
} }
if (capabilities != null && capabilities.getType() == ValueType.STRING) { if (capabilities != null && capabilities.getType() == ValueType.STRING) {
// string columns must use the on heap algorithm unless they have the following capabilites // string columns must use the on heap algorithm unless they have the following capabilites
return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue()); return capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue();
} else { } else {
// non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
return true; return false;
} }
} }
/**
* {@link ExtractionFn} which are one to one may have their execution deferred until as late as possible, since the
* which value is used as the grouping key itself doesn't particularly matter. For top-n, this method allows the
* query to be transformed in {@link TopNQueryQueryToolChest#preMergeQueryDecoration} to strip off the
* {@link ExtractionFn} on the broker, so that a more optimized algorithm (e.g. {@link PooledTopNAlgorithm}) can be
* chosen for processing segments, and then added back and evaluated against the final merged result sets on the
* broker via {@link TopNQueryQueryToolChest#postMergeQueryDecoration}.
*/
public static boolean canApplyExtractionInPost(TopNQuery query) public static boolean canApplyExtractionInPost(TopNQuery query)
{ {
return query.getDimensionSpec() != null return query.getDimensionSpec() != null