From db4d157be6f2f1d3eedbd99fe86fc03a835f7c10 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 10 Nov 2021 06:05:29 -0800 Subject: [PATCH] Add Finalization option to RowSignature.addAggregators. (#11882) * Add Finalization option to RowSignature.addAggregators. This make type signatures more useful when the caller knows whether it will be reading aggregation results in their finalized or intermediate types. * Fix call site. --- .../druid/query/groupby/GroupByQuery.java | 28 ++++++-- .../epinephelinae/RowBasedGrouperHelper.java | 41 ++++++++--- .../groupby/having/DimFilterHavingSpec.java | 4 +- .../TimeseriesQueryQueryToolChest.java | 6 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../druid/segment/column/RowSignature.java | 72 ++++++++++++++++--- 6 files changed, 125 insertions(+), 28 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 43328c1a0db..938fa46e00c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); this.universalTimestamp = computeUniversalTimestamp(); - this.resultRowSignature = computeResultRowSignature(); + this.resultRowSignature = computeResultRowSignature(RowSignature.Finalization.UNKNOWN); this.havingSpec = havingSpec; this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, this.dimensions); @@ -320,8 +320,7 @@ public class GroupByQuery extends BaseQuery } /** - * Returns a list of field names, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the - * order that they will appear in ResultRows for this query. + * Equivalent to {@code getResultRowSignature(Finalization.UNKNOWN)}. * * @see ResultRow for documentation about the order that fields will be in */ @@ -330,6 +329,25 @@ public class GroupByQuery extends BaseQuery return resultRowSignature; } + /** + * Returns a result row signature, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the + * order that they will appear in ResultRows for this query. + * + * Aggregator types in the signature depend on the value of {@code finalization}. + * + * If finalization is {@link RowSignature.Finalization#UNKNOWN}, this method returns a cached object. + * + * @see ResultRow for documentation about the order that fields will be in + */ + public RowSignature getResultRowSignature(final RowSignature.Finalization finalization) + { + if (finalization == RowSignature.Finalization.UNKNOWN) { + return resultRowSignature; + } else { + return computeResultRowSignature(finalization); + } + } + /** * Returns the size of ResultRows for this query when they do not include post-aggregators. */ @@ -481,7 +499,7 @@ public class GroupByQuery extends BaseQuery return forcePushDown; } - private RowSignature computeResultRowSignature() + private RowSignature computeResultRowSignature(final RowSignature.Finalization finalization) { final RowSignature.Builder builder = RowSignature.builder(); @@ -490,7 +508,7 @@ public class GroupByQuery extends BaseQuery } return builder.addDimensions(dimensions) - .addAggregators(aggregatorSpecs) + .addAggregators(aggregatorSpecs, finalization) .addPostAggregators(postAggregatorSpecs) .build(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index e5853985b85..1e4ef47205b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -68,6 +68,7 @@ import org.apache.druid.segment.RowAdapter; import org.apache.druid.segment.RowBasedColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.filter.BooleanValueMatcher; @@ -203,7 +204,8 @@ public class RowBasedGrouperHelper ColumnSelectorFactory columnSelectorFactory = createResultRowBasedColumnSelectorFactory( combining ? query : subquery, - columnSelectorRow::get + columnSelectorRow::get, + RowSignature.Finalization.UNKNOWN ); // Apply virtual columns if we are in subquery (non-combining) mode. @@ -341,14 +343,18 @@ public class RowBasedGrouperHelper /** * Creates a {@link ColumnSelectorFactory} that can read rows which originate as results of the provided "query". * - * @param query a groupBy query - * @param supplier supplier of result rows from the query + * @param query a groupBy query + * @param supplier supplier of result rows from the query + * @param finalization whether the column capabilities reported by this factory should reflect finalized types */ public static ColumnSelectorFactory createResultRowBasedColumnSelectorFactory( final GroupByQuery query, - final Supplier supplier + final Supplier supplier, + final RowSignature.Finalization finalization ) { + final RowSignature signature = query.getResultRowSignature(finalization); + final RowAdapter adapter = new RowAdapter() { @@ -366,7 +372,7 @@ public class RowBasedGrouperHelper @Override public Function columnFunction(final String columnName) { - final int columnIndex = query.getResultRowSignature().indexOf(columnName); + final int columnIndex = signature.indexOf(columnName); if (columnIndex < 0) { return row -> null; } else { @@ -378,7 +384,7 @@ public class RowBasedGrouperHelper return RowBasedColumnSelectorFactory.create( adapter, supplier::get, - () -> query.getResultRowSignature(), + () -> signature, false ); } @@ -400,7 +406,14 @@ public class RowBasedGrouperHelper final SettableSupplier rowSupplier = new SettableSupplier<>(); final ColumnSelectorFactory columnSelectorFactory = - query.getVirtualColumns().wrap(RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(subquery, rowSupplier)); + query.getVirtualColumns() + .wrap( + RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( + subquery, + rowSupplier, + RowSignature.Finalization.UNKNOWN + ) + ); final ValueMatcher filterMatcher = filter == null ? BooleanValueMatcher.of(true) @@ -965,7 +978,12 @@ public class RowBasedGrouperHelper } } - private static int compareDimsInRows(RowBasedKey key1, RowBasedKey key2, final List fieldTypes, int dimStart) + private static int compareDimsInRows( + RowBasedKey key1, + RowBasedKey key2, + final List fieldTypes, + int dimStart + ) { for (int i = dimStart; i < key1.getKey().length; i++) { final int cmp; @@ -1337,7 +1355,12 @@ public class RowBasedGrouperHelper case LONG: case FLOAT: case DOUBLE: - return makeNullHandlingNumericserdeHelper(valueType.getType(), keyBufferPosition, pushLimitDown, stringComparator); + return makeNullHandlingNumericserdeHelper( + valueType.getType(), + keyBufferPosition, + pushLimitDown, + stringComparator + ); default: throw new IAE("invalid type: %s", valueType); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 1e92e6cd2dd..9caf27841fc 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -32,6 +32,7 @@ import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper; +import org.apache.druid.segment.column.RowSignature; import java.util.Objects; import java.util.function.Function; @@ -83,7 +84,8 @@ public class DimFilterHavingSpec implements HavingSpec this.matcher = dimFilter.toFilter().makeMatcher( RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory( query, - rowSupplier + rowSupplier, + finalize ? RowSignature.Finalization.YES : RowSignature.Finalization.NO ) ); } diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 89c317ab0b0..64b3ab4f22c 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -223,6 +223,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest aggregatorSpecs = query.getAggregatorSpecs(); Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()]; String[] aggregatorNames = new String[aggregatorSpecs.size()]; + RowSignature aggregatorsSignature = + RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build(); for (int i = 0; i < aggregatorSpecs.size(); i++) { aggregators[i] = aggregatorSpecs.get(i) @@ -230,7 +232,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest new MapBasedRow(null, null), - () -> RowSignature.builder().addAggregators(aggregatorSpecs).build(), + () -> aggregatorsSignature, false ) ); @@ -417,7 +419,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest aggregators) + /** + * Adds aggregations to a signature. + * + * {@link Finalization#YES} will add finalized types and {@link Finalization#NO} will add intermediate types. + * {@link Finalization#UNKNOWN} will add the intermediate / finalized type when they are the same. Otherwise, it + * will add a null type. + * + * @param aggregators list of aggregation functions + * @param finalization whether the aggregator results will be finalized + */ + public Builder addAggregators(final List aggregators, final Finalization finalization) { for (final AggregatorFactory aggregator : aggregators) { - final ColumnType type = aggregator.getType(); - - if (type.equals(aggregator.getFinalizedType())) { - add(aggregator.getName(), type); - } else { - // Use null if the type depends on whether or not the aggregator is finalized, since - // we don't know if it will be finalized or not. So null (i.e. unknown) is the proper - // thing to do (currently). - add(aggregator.getName(), null); + final ColumnType type; + + switch (finalization) { + case YES: + type = aggregator.getFinalizedType(); + break; + + case NO: + type = aggregator.getType(); + break; + + default: + assert finalization == Finalization.UNKNOWN; + + if (aggregator.getType() == aggregator.getFinalizedType()) { + type = aggregator.getType(); + } else { + // Use null if the type depends on whether the aggregator is finalized, since we don't know if + // it will be finalized or not. + type = null; + } + break; } + + add(aggregator.getName(), type); } return this; } + /** + * Adds post-aggregators to a signature. + * + * Note: to ensure types are computed properly, post-aggregators must be added *after* any columns that they + * depend on, and they must be added in the order that the query engine will compute them. This method assumes + * that post-aggregators are computed in order, and that they can refer to earlier post-aggregators but not + * to later ones. + */ public Builder addPostAggregators(final List postAggregators) { for (final PostAggregator postAggregator : postAggregators) { @@ -289,4 +323,22 @@ public class RowSignature implements ColumnInspector return new RowSignature(columnTypeList); } } + + public enum Finalization + { + /** + * Aggregation results will be finalized. + */ + YES, + + /** + * Aggregation results will not be finalized. + */ + NO, + + /** + * Aggregation results may or may not be finalized. + */ + UNKNOWN + } }