From a0d49baad6a23f58b6591f78358baf2887339364 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 5 Jun 2023 21:58:41 -0700 Subject: [PATCH] MSQ: Fix issue with rollup ingestion and aggregators with multiple names. (#14367) The same aggregator can have two output names for a SQL like: INSERT INTO foo SELECT x, COUNT(*) AS y, COUNT(*) AS z FROM t GROUP BY 1 PARTITIONED BY ALL In this case, the SQL planner will create a query with a single "count" aggregator mapped to output names "y" and "z". The prior MSQ code did not properly handle this case, instead throwing an error like: Expected single output for query column[a0] but got [[1, 2]] --- .../apache/druid/msq/exec/ControllerImpl.java | 22 +++++----- .../apache/druid/msq/exec/MSQInsertTest.java | 40 +++++++++++++++++++ 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b39249d8111..db7a0f38b00 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1859,18 +1859,16 @@ public class ControllerImpl implements Controller if (isRollupQuery) { // Populate aggregators from the native query when doing an ingest in rollup mode. for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { - final int outputColumn = CollectionUtils.getOnlyElement( - columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()), - xs -> new ISE("Expected single output for query column[%s] but got[%s]", aggregatorFactory.getName(), xs) - ); - final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); - if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { - throw new ISE("There can only be one aggregation for column [%s].", outputColumn); - } else { - outputColumnAggregatorFactories.put( - outputColumnName, - aggregatorFactory.withName(outputColumnName).getCombiningFactory() - ); + for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) { + final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); + if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { + throw new ISE("There can only be one aggregation for column [%s].", outputColumn); + } else { + outputColumnAggregatorFactories.put( + outputColumnName, + aggregatorFactory.withName(outputColumnName).getCombiningFactory() + ); + } } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 479bc4de0d2..548e5e01664 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -37,6 +37,7 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.NestedDataTestUtils; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.segment.column.ColumnType; @@ -295,6 +296,45 @@ public class MSQInsertTest extends MSQTestBase } + @Test + public void testInsertOnFoo1WithTwoCountAggregatorsWithRollupContext() + { + final List expectedRows = expectedFooRows(); + + // Add 1L to each expected row, since we have two count aggregators. + for (int i = 0; i < expectedRows.size(); i++) { + final Object[] expectedRow = expectedRows.get(i); + final Object[] newExpectedRow = new Object[expectedRow.length + 1]; + System.arraycopy(expectedRow, 0, newExpectedRow, 0, expectedRow.length); + newExpectedRow[expectedRow.length] = 1L; + expectedRows.set(i, newExpectedRow); + } + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("cnt2", ColumnType.LONG) + .build(); + + testIngestQuery().setSql( + "insert into foo1\n" + + "select __time, dim1 , count(*) as cnt, count(*) as cnt2\n" + + "from foo\n" + + "where dim1 is not null\n" + + "group by 1, 2\n" + + "PARTITIONED by All") + .setExpectedDataSource("foo1") + .setQueryContext(QueryContexts.override(context, ROLLUP_CONTEXT_PARAMS)) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0))) + .setExpectedResultRows(expectedRows) + .setExpectedRollUp(true) + .addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt")) + .addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt2", "cnt2")) + .verifyResults(); + } + @Test public void testInsertOnFoo1WithGroupByLimitWithClusterBy() {