Add Finalization option to RowSignature.addAggregators. (#11882)

* Add Finalization option to RowSignature.addAggregators.

This make type signatures more useful when the caller knows whether it will
be reading aggregation results in their finalized or intermediate types.

* Fix call site.
This commit is contained in:
Gian Merlino 2021-11-10 06:05:29 -08:00 committed by GitHub
parent d3914c1a78
commit db4d157be6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 125 additions and 28 deletions

View File

@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
this.universalTimestamp = computeUniversalTimestamp();
this.resultRowSignature = computeResultRowSignature();
this.resultRowSignature = computeResultRowSignature(RowSignature.Finalization.UNKNOWN);
this.havingSpec = havingSpec;
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);
this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, this.dimensions);
@ -320,8 +320,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
/**
* Returns a list of field names, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the
* order that they will appear in ResultRows for this query.
* Equivalent to {@code getResultRowSignature(Finalization.UNKNOWN)}.
*
* @see ResultRow for documentation about the order that fields will be in
*/
@ -330,6 +329,25 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return resultRowSignature;
}
/**
* Returns a result row signature, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the
* order that they will appear in ResultRows for this query.
*
* Aggregator types in the signature depend on the value of {@code finalization}.
*
* If finalization is {@link RowSignature.Finalization#UNKNOWN}, this method returns a cached object.
*
* @see ResultRow for documentation about the order that fields will be in
*/
public RowSignature getResultRowSignature(final RowSignature.Finalization finalization)
{
if (finalization == RowSignature.Finalization.UNKNOWN) {
return resultRowSignature;
} else {
return computeResultRowSignature(finalization);
}
}
/**
* Returns the size of ResultRows for this query when they do not include post-aggregators.
*/
@ -481,7 +499,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return forcePushDown;
}
private RowSignature computeResultRowSignature()
private RowSignature computeResultRowSignature(final RowSignature.Finalization finalization)
{
final RowSignature.Builder builder = RowSignature.builder();
@ -490,7 +508,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
}
return builder.addDimensions(dimensions)
.addAggregators(aggregatorSpecs)
.addAggregators(aggregatorSpecs, finalization)
.addPostAggregators(postAggregatorSpecs)
.build();
}

View File

@ -68,6 +68,7 @@ import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.BooleanValueMatcher;
@ -203,7 +204,8 @@ public class RowBasedGrouperHelper
ColumnSelectorFactory columnSelectorFactory = createResultRowBasedColumnSelectorFactory(
combining ? query : subquery,
columnSelectorRow::get
columnSelectorRow::get,
RowSignature.Finalization.UNKNOWN
);
// Apply virtual columns if we are in subquery (non-combining) mode.
@ -343,12 +345,16 @@ public class RowBasedGrouperHelper
*
* @param query a groupBy query
* @param supplier supplier of result rows from the query
* @param finalization whether the column capabilities reported by this factory should reflect finalized types
*/
public static ColumnSelectorFactory createResultRowBasedColumnSelectorFactory(
final GroupByQuery query,
final Supplier<ResultRow> supplier
final Supplier<ResultRow> supplier,
final RowSignature.Finalization finalization
)
{
final RowSignature signature = query.getResultRowSignature(finalization);
final RowAdapter<ResultRow> adapter =
new RowAdapter<ResultRow>()
{
@ -366,7 +372,7 @@ public class RowBasedGrouperHelper
@Override
public Function<ResultRow, Object> columnFunction(final String columnName)
{
final int columnIndex = query.getResultRowSignature().indexOf(columnName);
final int columnIndex = signature.indexOf(columnName);
if (columnIndex < 0) {
return row -> null;
} else {
@ -378,7 +384,7 @@ public class RowBasedGrouperHelper
return RowBasedColumnSelectorFactory.create(
adapter,
supplier::get,
() -> query.getResultRowSignature(),
() -> signature,
false
);
}
@ -400,7 +406,14 @@ public class RowBasedGrouperHelper
final SettableSupplier<ResultRow> rowSupplier = new SettableSupplier<>();
final ColumnSelectorFactory columnSelectorFactory =
query.getVirtualColumns().wrap(RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(subquery, rowSupplier));
query.getVirtualColumns()
.wrap(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
subquery,
rowSupplier,
RowSignature.Finalization.UNKNOWN
)
);
final ValueMatcher filterMatcher = filter == null
? BooleanValueMatcher.of(true)
@ -965,7 +978,12 @@ public class RowBasedGrouperHelper
}
}
private static int compareDimsInRows(RowBasedKey key1, RowBasedKey key2, final List<ColumnType> fieldTypes, int dimStart)
private static int compareDimsInRows(
RowBasedKey key1,
RowBasedKey key2,
final List<ColumnType> fieldTypes,
int dimStart
)
{
for (int i = dimStart; i < key1.getKey().length; i++) {
final int cmp;
@ -1337,7 +1355,12 @@ public class RowBasedGrouperHelper
case LONG:
case FLOAT:
case DOUBLE:
return makeNullHandlingNumericserdeHelper(valueType.getType(), keyBufferPosition, pushLimitDown, stringComparator);
return makeNullHandlingNumericserdeHelper(
valueType.getType(),
keyBufferPosition,
pushLimitDown,
stringComparator
);
default:
throw new IAE("invalid type: %s", valueType);
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;
import org.apache.druid.segment.column.RowSignature;
import java.util.Objects;
import java.util.function.Function;
@ -83,7 +84,8 @@ public class DimFilterHavingSpec implements HavingSpec
this.matcher = dimFilter.toFilter().makeMatcher(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
rowSupplier
rowSupplier,
finalize ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
)
);
}

View File

@ -223,6 +223,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
String[] aggregatorNames = new String[aggregatorSpecs.size()];
RowSignature aggregatorsSignature =
RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build();
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] =
aggregatorSpecs.get(i)
@ -230,7 +232,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
() -> RowSignature.builder().addAggregators(aggregatorSpecs).build(),
() -> aggregatorsSignature,
false
)
);
@ -417,7 +419,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
rowSignatureBuilder.add(query.getTimestampResultField(), ColumnType.LONG);
}
rowSignatureBuilder.addAggregators(query.getAggregatorSpecs());
rowSignatureBuilder.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN);
rowSignatureBuilder.addPostAggregators(query.getPostAggregatorSpecs());
return rowSignatureBuilder.build();
}

View File

@ -511,7 +511,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return RowSignature.builder()
.addTimeColumn()
.addDimensions(Collections.singletonList(query.getDimensionSpec()))
.addAggregators(query.getAggregatorSpecs())
.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN)
.addPostAggregators(query.getPostAggregatorSpecs())
.build();
}

View File

@ -188,6 +188,7 @@ public class RowSignature implements ColumnInspector
}
final String columnName = columnNames.get(i);
s.append(columnName).append(":").append(columnTypes.get(columnName));
}
return s.append("}").toString();
}
@ -249,24 +250,57 @@ public class RowSignature implements ColumnInspector
return this;
}
public Builder addAggregators(final List<AggregatorFactory> aggregators)
/**
* Adds aggregations to a signature.
*
* {@link Finalization#YES} will add finalized types and {@link Finalization#NO} will add intermediate types.
* {@link Finalization#UNKNOWN} will add the intermediate / finalized type when they are the same. Otherwise, it
* will add a null type.
*
* @param aggregators list of aggregation functions
* @param finalization whether the aggregator results will be finalized
*/
public Builder addAggregators(final List<AggregatorFactory> aggregators, final Finalization finalization)
{
for (final AggregatorFactory aggregator : aggregators) {
final ColumnType type = aggregator.getType();
final ColumnType type;
if (type.equals(aggregator.getFinalizedType())) {
add(aggregator.getName(), type);
switch (finalization) {
case YES:
type = aggregator.getFinalizedType();
break;
case NO:
type = aggregator.getType();
break;
default:
assert finalization == Finalization.UNKNOWN;
if (aggregator.getType() == aggregator.getFinalizedType()) {
type = aggregator.getType();
} else {
// Use null if the type depends on whether or not the aggregator is finalized, since
// we don't know if it will be finalized or not. So null (i.e. unknown) is the proper
// thing to do (currently).
add(aggregator.getName(), null);
// Use null if the type depends on whether the aggregator is finalized, since we don't know if
// it will be finalized or not.
type = null;
}
break;
}
add(aggregator.getName(), type);
}
return this;
}
/**
* Adds post-aggregators to a signature.
*
* Note: to ensure types are computed properly, post-aggregators must be added *after* any columns that they
* depend on, and they must be added in the order that the query engine will compute them. This method assumes
* that post-aggregators are computed in order, and that they can refer to earlier post-aggregators but not
* to later ones.
*/
public Builder addPostAggregators(final List<PostAggregator> postAggregators)
{
for (final PostAggregator postAggregator : postAggregators) {
@ -289,4 +323,22 @@ public class RowSignature implements ColumnInspector
return new RowSignature(columnTypeList);
}
}
public enum Finalization
{
/**
* Aggregation results will be finalized.
*/
YES,
/**
* Aggregation results will not be finalized.
*/
NO,
/**
* Aggregation results may or may not be finalized.
*/
UNKNOWN
}
}