MSQ: Fix issue with rollup ingestion and aggregators with multiple names. (#14367)

The same aggregator can have two output names for a SQL like:

  INSERT INTO foo
  SELECT x, COUNT(*) AS y, COUNT(*) AS z
  FROM t
  GROUP BY 1
  PARTITIONED BY ALL

In this case, the SQL planner will create a query with a single "count"
aggregator mapped to output names "y" and "z". The prior MSQ code did
not properly handle this case, instead throwing an error like:

  Expected single output for query column[a0] but got [[1, 2]]
This commit is contained in:
Gian Merlino 2023-06-05 21:58:41 -07:00 committed by GitHub
parent c14e54cf93
commit a0d49baad6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 12 deletions

View File

@ -1859,10 +1859,7 @@ public class ControllerImpl implements Controller
if (isRollupQuery) { if (isRollupQuery) {
// Populate aggregators from the native query when doing an ingest in rollup mode. // Populate aggregators from the native query when doing an ingest in rollup mode.
for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) {
final int outputColumn = CollectionUtils.getOnlyElement( for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) {
columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()),
xs -> new ISE("Expected single output for query column[%s] but got[%s]", aggregatorFactory.getName(), xs)
);
final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); final String outputColumnName = columnMappings.getOutputColumnName(outputColumn);
if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
throw new ISE("There can only be one aggregation for column [%s].", outputColumn); throw new ISE("There can only be one aggregation for column [%s].", outputColumn);
@ -1874,6 +1871,7 @@ public class ControllerImpl implements Controller
} }
} }
} }
}
// Each column can be of either time, dimension, aggregator. For this method. we can ignore the time column. // Each column can be of either time, dimension, aggregator. For this method. we can ignore the time column.
// For non-complex columns, If the aggregator factory of the column is not available, we treat the column as // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as

View File

@ -37,6 +37,7 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
@ -295,6 +296,45 @@ public class MSQInsertTest extends MSQTestBase
} }
@Test
public void testInsertOnFoo1WithTwoCountAggregatorsWithRollupContext()
{
final List<Object[]> expectedRows = expectedFooRows();
// Add 1L to each expected row, since we have two count aggregators.
for (int i = 0; i < expectedRows.size(); i++) {
final Object[] expectedRow = expectedRows.get(i);
final Object[] newExpectedRow = new Object[expectedRow.length + 1];
System.arraycopy(expectedRow, 0, newExpectedRow, 0, expectedRow.length);
newExpectedRow[expectedRow.length] = 1L;
expectedRows.set(i, newExpectedRow);
}
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("cnt2", ColumnType.LONG)
.build();
testIngestQuery().setSql(
"insert into foo1\n"
+ "select __time, dim1 , count(*) as cnt, count(*) as cnt2\n"
+ "from foo\n"
+ "where dim1 is not null\n"
+ "group by 1, 2\n"
+ "PARTITIONED by All")
.setExpectedDataSource("foo1")
.setQueryContext(QueryContexts.override(context, ROLLUP_CONTEXT_PARAMS))
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(expectedRows)
.setExpectedRollUp(true)
.addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt"))
.addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt2", "cnt2"))
.verifyResults();
}
@Test @Test
public void testInsertOnFoo1WithGroupByLimitWithClusterBy() public void testInsertOnFoo1WithGroupByLimitWithClusterBy()
{ {