mirror of https://github.com/apache/druid.git
SQL: Fix post-aggregator naming logic for sort-project. (#6250)
The old code assumes that post-aggregator prefixes are one character long followed by numbers. This isn't always true (we may pad with underscores to avoid conflicts). Instead, the new code uses a different base prefix for sort-project postaggregators ("s" instead of "p") and uses the usual Calcites.findUnusedPrefix function to avoid conflicts.
This commit is contained in:
parent
a879022bc8
commit
80224df36a
|
@ -89,7 +89,6 @@ import javax.annotation.Nullable;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -298,7 +297,7 @@ public class DruidQuery
|
|||
plannerContext,
|
||||
aggregateRowSignature,
|
||||
aggregateProject,
|
||||
0
|
||||
"p"
|
||||
);
|
||||
projectRowOrderAndPostAggregations.postAggregations.forEach(
|
||||
postAggregator -> aggregations.add(Aggregation.create(postAggregator))
|
||||
|
@ -337,17 +336,11 @@ public class DruidQuery
|
|||
if (sortProject == null) {
|
||||
return null;
|
||||
} else {
|
||||
final List<PostAggregator> postAggregators = grouping.getPostAggregators();
|
||||
final OptionalInt maybeMaxCounter = postAggregators
|
||||
.stream()
|
||||
.mapToInt(postAggregator -> Integer.parseInt(postAggregator.getName().substring(1)))
|
||||
.max();
|
||||
|
||||
final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations(
|
||||
plannerContext,
|
||||
sortingInputRowSignature,
|
||||
sortProject,
|
||||
maybeMaxCounter.orElse(-1) + 1 // 0 if max doesn't exist
|
||||
"s"
|
||||
);
|
||||
|
||||
return new SortProject(
|
||||
|
@ -374,12 +367,17 @@ public class DruidQuery
|
|||
PlannerContext plannerContext,
|
||||
RowSignature inputRowSignature,
|
||||
Project project,
|
||||
int outputNameCounter
|
||||
String basePrefix
|
||||
)
|
||||
{
|
||||
final List<String> rowOrder = new ArrayList<>();
|
||||
final List<PostAggregator> aggregations = new ArrayList<>();
|
||||
final String outputNamePrefix = Calcites.findUnusedPrefix(
|
||||
basePrefix,
|
||||
new TreeSet<>(inputRowSignature.getRowOrder())
|
||||
);
|
||||
|
||||
int outputNameCounter = 0;
|
||||
for (final RexNode postAggregatorRexNode : project.getChildExps()) {
|
||||
// Attempt to convert to PostAggregator.
|
||||
final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
|
||||
|
@ -397,7 +395,7 @@ public class DruidQuery
|
|||
// (There might be a SQL-level type cast that we don't care about)
|
||||
rowOrder.add(postAggregatorExpression.getDirectColumn());
|
||||
} else {
|
||||
final String postAggregatorName = "p" + outputNameCounter++;
|
||||
final String postAggregatorName = outputNamePrefix + outputNameCounter++;
|
||||
final PostAggregator postAggregator = new ExpressionPostAggregator(
|
||||
postAggregatorName,
|
||||
postAggregatorExpression.getExpression(),
|
||||
|
|
|
@ -4397,6 +4397,69 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinMaxAvgDailyCountWithLimit() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT * FROM ("
|
||||
+ " SELECT max(cnt), min(cnt), avg(cnt), TIME_EXTRACT(max(t), 'EPOCH') last_time, count(1) num_days FROM (\n"
|
||||
+ " SELECT TIME_FLOOR(__time, 'P1D') AS t, count(1) cnt\n"
|
||||
+ " FROM \"foo\"\n"
|
||||
+ " GROUP BY 1\n"
|
||||
+ " )"
|
||||
+ ") LIMIT 1\n",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
new QueryDataSource(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setVirtualColumns(
|
||||
EXPRESSION_VIRTUAL_COLUMN(
|
||||
"d0:v",
|
||||
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
|
||||
ValueType.LONG
|
||||
)
|
||||
)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
|
||||
.setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0")))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setAggregatorSpecs(AGGS(
|
||||
new LongMaxAggregatorFactory("_a0", "a0"),
|
||||
new LongMinAggregatorFactory("_a1", "a0"),
|
||||
new LongSumAggregatorFactory("_a2:sum", "a0"),
|
||||
new CountAggregatorFactory("_a2:count"),
|
||||
new LongMaxAggregatorFactory("_a3", "d0"),
|
||||
new CountAggregatorFactory("_a4")
|
||||
))
|
||||
.setPostAggregatorSpecs(
|
||||
ImmutableList.of(
|
||||
new ArithmeticPostAggregator(
|
||||
"_a2",
|
||||
"quotient",
|
||||
ImmutableList.of(
|
||||
new FieldAccessPostAggregator(null, "_a2:sum"),
|
||||
new FieldAccessPostAggregator(null, "_a2:count")
|
||||
)
|
||||
),
|
||||
EXPRESSION_POST_AGG("s0", "timestamp_extract(\"_a3\",'EPOCH','UTC')")
|
||||
)
|
||||
)
|
||||
.setLimit(1)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(new Object[]{1L, 1L, 1L, 978480000L, 6L})
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvgDailyCountDistinct() throws Exception
|
||||
{
|
||||
|
@ -6997,7 +7060,7 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2"))
|
||||
)
|
||||
.setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG(
|
||||
"p0",
|
||||
"s0",
|
||||
"(\"a1\" / \"a0\")"
|
||||
)))
|
||||
.setLimitSpec(
|
||||
|
|
Loading…
Reference in New Issue