Deduce type from the aggregators when materializing subquery results (#16703)

For aggregators like StringFirst/Last, whose intermediate type isn't the same as the final type, using them in GroupBy, TopN or Timeseries subqueries causes a fallback when maxSubqueryBytes is set. This is because we assume that the finalization is not known, due to which the row signature cannot determine whether to use the intermediate or the final type, and it puts it as null. This PR figures out the finalization from the query context and uses the intermediate or the final type appropriately.
This commit is contained in:
Laksh Singla 2024-07-23 11:52:39 +05:30 committed by GitHub
parent c45d4fdbca
commit 11bb40981e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 258 additions and 19 deletions

View File

@ -828,7 +828,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
boolean useNestedForUnknownTypes
)
{
RowSignature rowSignature = resultArraySignature(query);
RowSignature rowSignature = query.getResultRowSignature(
query.context().isFinalize(true)
? RowSignature.Finalization.YES
: RowSignature.Finalization.NO
);
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
@ -39,6 +40,8 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec.LimitJsonIncludeFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -167,6 +170,19 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return context().getBoolean(SKIP_EMPTY_BUCKETS, false);
}
public RowSignature getResultSignature(final RowSignature.Finalization finalization)
{
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
String timestampResultField = getTimestampResultField();
if (StringUtils.isNotEmpty(timestampResultField)) {
builder.add(timestampResultField, ColumnType.LONG);
}
builder.addAggregators(aggregatorSpecs, finalization);
builder.addPostAggregators(postAggregatorSpecs);
return builder.build();
}
@Nullable
@Override
public Set<String> getRequiredColumns()

View File

@ -62,7 +62,6 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
@ -439,14 +438,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public RowSignature resultArraySignature(TimeseriesQuery query)
{
RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
rowSignatureBuilder.addTimeColumn();
if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
rowSignatureBuilder.add(query.getTimestampResultField(), ColumnType.LONG);
}
rowSignatureBuilder.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN);
rowSignatureBuilder.addPostAggregators(query.getPostAggregatorSpecs());
return rowSignatureBuilder.build();
return query.getResultSignature(RowSignature.Finalization.UNKNOWN);
}
@Override
@ -486,7 +478,10 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
boolean useNestedForUnknownTypes
)
{
final RowSignature rowSignature = resultArraySignature(query);
final RowSignature rowSignature =
query.getResultSignature(
query.context().isFinalize(true) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
);
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature

View File

@ -37,6 +37,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -185,6 +186,16 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
topNMetricSpec.initTopNAlgorithmSelector(selector);
}
public RowSignature getResultSignature(final RowSignature.Finalization finalization)
{
return RowSignature.builder()
.addTimeColumn()
.addDimensions(Collections.singletonList(getDimensionSpec()))
.addAggregators(getAggregatorSpecs(), finalization)
.addPostAggregators(getPostAggregatorSpecs())
.build();
}
@Override
public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{

View File

@ -68,7 +68,6 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -518,12 +517,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public RowSignature resultArraySignature(TopNQuery query)
{
return RowSignature.builder()
.addTimeColumn()
.addDimensions(Collections.singletonList(query.getDimensionSpec()))
.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN)
.addPostAggregators(query.getPostAggregatorSpecs())
.build();
return query.getResultSignature(RowSignature.Finalization.UNKNOWN);
}
@Override
@ -569,7 +563,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
boolean useNestedForUnknownTypes
)
{
final RowSignature rowSignature = resultArraySignature(query);
final RowSignature rowSignature = query.getResultSignature(
query.context().isFinalize(true) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
);
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature

View File

@ -51,10 +51,12 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@ -1385,6 +1387,220 @@ public class CalciteSubqueryTest extends BaseCalciteQueryTest
);
}
@MethodSource("constructorFeeder")
@ParameterizedTest(name = "{0}")
public void testGroupBySubqueryWithEarliestAggregator(String testName, Map<String, Object> queryContext)
{
cannotVectorize();
// Note: EARLIEST aggregator is used because the intermediate type "serializablePair" is different from the finalized type
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
new Object[]{"1", "", "a", "1"},
new Object[]{"10.1", "b", "", "10.1"},
new Object[]{"10.1", "c", "", "10.1"},
new Object[]{"2", "d", "", "2"},
new Object[]{"abc", "", "", "abc"},
new Object[]{"def", "", "abc", "def"}
);
} else {
expectedResults = ImmutableList.of(
new Object[]{"", "a", "a", ""},
new Object[]{"", "b", "a", ""},
new Object[]{"1", "", "a", "1"},
new Object[]{"10.1", "b", null, "10.1"},
new Object[]{"10.1", "c", null, "10.1"},
new Object[]{"2", "d", "", "2"},
new Object[]{"abc", null, null, "abc"},
new Object[]{"def", null, "abc", "def"}
);
}
testQuery(
"SELECT a.dim1, a.dim3, a.e_dim2, b.dim1 "
+ "FROM ("
+ " SELECT dim1, dim3, EARLIEST(dim2) AS e_dim2 "
+ " FROM foo GROUP BY 1, 2 LIMIT 100"
+ ") a "
+ "INNER JOIN foo b ON a.dim1 = b.dim1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setInterval(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING),
new DefaultDimensionSpec("dim3", "d1", ColumnType.STRING)
)
.addAggregator(new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024))
.setLimitSpec(new DefaultLimitSpec(Collections.emptyList(), 100))
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("dim1")
.build()
),
"j0.",
"(\"d0\" == \"j0.dim1\")",
JoinType.INNER,
null,
TestExprMacroTable.INSTANCE,
null
)
)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("a0", "d0", "d1", "j0.dim1")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@MethodSource("constructorFeeder")
@ParameterizedTest(name = "{0}")
public void testTopNSubqueryWithEarliestAggregator(String testName, Map<String, Object> queryContext)
{
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
new Object[]{"1", "a", "1"},
new Object[]{"10.1", "", "10.1"},
new Object[]{"2", "", "2"},
new Object[]{"abc", "", "abc"},
new Object[]{"def", "abc", "def"}
);
} else {
expectedResults = ImmutableList.of(
new Object[]{"", "a", ""},
new Object[]{"1", "a", "1"},
new Object[]{"10.1", null, "10.1"},
new Object[]{"2", "", "2"},
new Object[]{"abc", null, "abc"},
new Object[]{"def", "abc", "def"}
);
}
testQuery(
"SELECT a.dim1, a.e_dim2, b.dim1 "
+ "FROM ("
+ " SELECT dim1, EARLIEST(dim2) AS e_dim2 "
+ " FROM foo "
+ " GROUP BY 1 "
+ " LIMIT 100"
+ ") a "
+ "INNER JOIN foo b ON a.dim1 = b.dim1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new QueryDataSource(
new TopNQueryBuilder()
.dataSource("foo")
.dimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
.metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC))
.threshold(100)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.granularity(Granularities.ALL)
.aggregators(
new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024)
)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("dim1")
.build()
),
"j0.",
"(\"d0\" == \"j0.dim1\")",
JoinType.INNER,
null,
TestExprMacroTable.INSTANCE,
null
)
)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("a0", "d0", "j0.dim1")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@MethodSource("constructorFeeder")
@ParameterizedTest(name = "{0}")
public void testTimeseriesSubqueryWithEarliestAggregator(String testName, Map<String, Object> queryContext)
{
testQuery(
"SELECT a.__time, a.e_dim2, b.__time "
+ "FROM ("
+ " SELECT TIME_FLOOR(\"__time\", 'PT24H') as __time, EARLIEST(dim2) AS e_dim2 "
+ " FROM foo "
+ " GROUP BY 1 "
+ ") a "
+ "INNER JOIN foo b ON a.__time = b.__time",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new QueryDataSource(
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Intervals.ETERNITY))
.granularity(new PeriodGranularity(
new Period("PT24H"),
null,
DateTimeZone.UTC
))
.aggregators(new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024))
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("__time")
.build()
),
"j0.",
"(\"d0\" == \"j0.__time\")",
JoinType.INNER,
null,
TestExprMacroTable.INSTANCE,
null
)
)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.columns("a0", "d0", "j0.__time")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{946684800000L, "a", 946684800000L},
new Object[]{946771200000L, NullHandling.defaultStringValue(), 946771200000L},
new Object[]{946857600000L, "", 946857600000L},
new Object[]{978307200000L, "a", 978307200000L},
new Object[]{978393600000L, "abc", 978393600000L},
new Object[]{978480000000L, NullHandling.defaultStringValue(), 978480000000L}
)
);
}
public static class SubqueryComponentSupplier extends SqlTestFramework.StandardComponentSupplier
{