diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java index d5b31328b0b..d0a12b3ae75 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java @@ -36,8 +36,10 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.DataSource; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference; @@ -96,7 +98,17 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor final long memoryReservedForBroadcastJoin ) { - if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { + // An UnnestDataSource or FilteredDataSource can have a join as a base + // In such a case a side channel is expected to be there + final DataSource baseDataSource; + if (dataSource instanceof UnnestDataSource) { + baseDataSource = ((UnnestDataSource) dataSource).getBase(); + } else if (dataSource instanceof FilteredDataSource) { + baseDataSource = ((FilteredDataSource) dataSource).getBase(); + } else { + baseDataSource = dataSource; + } + if (!(baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) { throw new ISE("Did not expect side channels for dataSource [%s]", dataSource); } @@ -106,8 +118,8 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor if (baseInput.hasChannel()) { inputChannels.add(baseInput.getChannel()); } - - if (dataSource instanceof JoinDataSource) { + + if (baseDataSource instanceof JoinDataSource) { final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap(); final List channelReaders = new ArrayList<>(); @@ -196,7 +208,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor if (segmentMapFn != null) { return true; } else if (broadcastJoinHelper == null) { - segmentMapFn = Function.identity(); + segmentMapFn = query.getDataSource().createSegmentMapFunction(query, cpuAccumulator); return true; } else { final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index a826f1928e6..d8481bf7a09 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -44,12 +44,14 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory; import org.apache.druid.query.DataSource; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; @@ -135,8 +137,29 @@ public class DataSourcePlan checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forInline((InlineDataSource) dataSource, broadcast); } else if (dataSource instanceof LookupDataSource) { - checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forLookup((LookupDataSource) dataSource, broadcast); + } else if (dataSource instanceof FilteredDataSource) { + return forFilteredDataSource( + queryKit, + queryId, + queryContext, + (FilteredDataSource) dataSource, + querySegmentSpec, + maxWorkerCount, + minStageNumber, + broadcast + ); + } else if (dataSource instanceof UnnestDataSource) { + return forUnnest( + queryKit, + queryId, + queryContext, + (UnnestDataSource) dataSource, + querySegmentSpec, + maxWorkerCount, + minStageNumber, + broadcast + ); } else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( @@ -353,6 +376,88 @@ public class DataSourcePlan ); } + private static DataSourcePlan forFilteredDataSource( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final FilteredDataSource dataSource, + final QuerySegmentSpec querySegmentSpec, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + final DataSourcePlan basePlan = forDataSource( + queryKit, + queryId, + queryContext, + dataSource.getBase(), + querySegmentSpec, + null, + maxWorkerCount, + minStageNumber, + broadcast + ); + + DataSource newDataSource = basePlan.getNewDataSource(); + + final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); + newDataSource = FilteredDataSource.create(newDataSource, dataSource.getFilter()); + return new DataSourcePlan( + newDataSource, + inputSpecs, + basePlan.getBroadcastInputs(), + basePlan.getSubQueryDefBuilder().orElse(null) + ); + + } + + /** + * Build a plan for Unnest data source + */ + private static DataSourcePlan forUnnest( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final UnnestDataSource dataSource, + final QuerySegmentSpec querySegmentSpec, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + // Find the plan for base data source by recursing + final DataSourcePlan basePlan = forDataSource( + queryKit, + queryId, + queryContext, + dataSource.getBase(), + querySegmentSpec, + null, + maxWorkerCount, + minStageNumber, + broadcast + ); + DataSource newDataSource = basePlan.getNewDataSource(); + + final List inputSpecs = new ArrayList<>(basePlan.getInputSpecs()); + + // Create the new data source using the data source from the base plan + newDataSource = UnnestDataSource.create( + newDataSource, + dataSource.getVirtualColumn(), + dataSource.getUnnestFilter() + ); + // The base data source can be a join and might already have broadcast inputs + // Need to set the broadcast inputs from the basePlan + return new DataSourcePlan( + newDataSource, + inputSpecs, + basePlan.getBroadcastInputs(), + basePlan.getSubQueryDefBuilder().orElse(null) + ); + } + /** * Build a plan for broadcast hash-join. */ @@ -379,7 +484,6 @@ public class DataSourcePlan null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly. maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - broadcast ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index ee7a590e1f0..e6578388a40 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -114,8 +114,8 @@ public class MSQTaskSqlEngine implements SqlEngine case TIME_BOUNDARY_QUERY: case GROUPING_SETS: case WINDOW_FUNCTIONS: - case UNNEST: return false; + case UNNEST: case CAN_SELECT: case CAN_INSERT: case CAN_REPLACE: diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 009f595bf31..81ba53f755e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -185,6 +185,95 @@ public class MSQInsertTest extends MSQTestBase } + @Test + public void testInsertWithUnnestInline() + { + List expectedRows = ImmutableList.of( + new Object[]{1692226800000L, 1L}, + new Object[]{1692226800000L, 2L}, + new Object[]{1692226800000L, 3L} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.LONG) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + + @Test + public void testInsertWithUnnest() + { + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select __time, d from foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + + @Test + public void testInsertWithUnnestWithVirtualColumns() + { + List expectedRows = ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978480000000L, 6.0f}, + new Object[]{978480000000L, 6.0f} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.FLOAT) + .build(); + + + testIngestQuery().setSql( + "insert into foo1 select __time, d from foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + @Test public void testInsertOnExternalDataSource() throws IOException { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 80d6719f121..0a43fdaea72 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -739,6 +739,182 @@ public class MSQReplaceTest extends MSQTestBase .verifyPlanningErrors(); } + @Test + public void testReplaceUnnestSegmentEntireTable() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE ALL " + + "SELECT __time, d " + + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) " + + "PARTITIONED BY ALL TIME ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-01T/P1M"), + "test", + 0 + ))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(8).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(8).frames(1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(8), + 1, 0 + ) + .verifyResults(); + } + + @Test + public void testReplaceUnnestWithVirtualColumnSegmentEntireTable() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.FLOAT) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE ALL " + + "SELECT __time, d " + + "FROM foo, UNNEST(ARRAY[m1, m2]) as unnested(d) " + + "PARTITIONED BY ALL TIME ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY) + .setExpectedSegment(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-01T/P1M"), + "test", + 0 + ))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, 1.0f}, + new Object[]{946684800000L, 1.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946771200000L, 2.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{946857600000L, 3.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978307200000L, 4.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978393600000L, 5.0f}, + new Object[]{978480000000L, 6.0f}, + new Object[]{978480000000L, 6.0f} + ) + ) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(12).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(12).frames(1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(12), + 1, 0 + ) + .verifyResults(); + } + + @Test + public void testReplaceUnnestSegmentWithTimeFilter() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("d", ColumnType.STRING) + .build(); + + testIngestQuery().setSql(" REPLACE INTO foo " + + "OVERWRITE WHERE __time >= TIMESTAMP '1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + + "SELECT __time, d " + + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) " + + "PARTITIONED BY DAY CLUSTERED BY d ") + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of( + "1999-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z"))) + .setExpectedShardSpec(DimensionRangeShardSpec.class) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{946684800000L, "a"}, + new Object[]{946684800000L, "b"}, + new Object[]{946771200000L, "b"}, + new Object[]{946771200000L, "c"}, + new Object[]{946857600000L, "d"}, + new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null}, + new Object[]{978393600000L, null}, + new Object[]{978480000000L, null} + ) + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(8), + 1, 0 + ) + .verifyResults(); + } + @Test public void testReplaceTombstonesOverPartiallyOverlappingSegments() { @@ -755,7 +931,9 @@ public class MSQReplaceTest extends MSQTestBase .dataSource("foo1") .build(); - Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); List expectedResults; if (NullHandling.sqlCompatible()) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index ae0bfa71f1e..6b6f8ff356e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; @@ -136,6 +137,7 @@ public class MSQSelectTest extends MSQTestBase {QUERY_RESULTS_WITH_DURABLE_STORAGE, QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT}, {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT} }; + return Arrays.asList(data); } @@ -2119,6 +2121,207 @@ public class MSQSelectTest extends MSQTestBase .verifyResults(); } + @Test + public void testSelectUnnestOnInlineFoo() + { + RowSignature resultSignature = RowSignature.builder() + .add("EXPR$0", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .add("d", ColumnType.LONG) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("EXPR$0", "d") + ) + ); + + testSelectQuery() + .setSql("select d from UNNEST(ARRAY[1,2,3]) as unnested(d)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource( + InlineDataSource.fromIterable( + ImmutableList.of( + new Object[]{1L}, + new Object[]{2L}, + new Object[]{3L} + ), + resultSignature + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("EXPR$0") + .context(defaultScanQueryContext( + context, + resultSignature + )) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1}, + new Object[]{2}, + new Object[]{3} + )) + .verifyResults(); + } + + + @Test + public void testSelectUnnestOnFoo() + { + RowSignature resultSignature = RowSignature.builder() + .add("j0.unnest", ColumnType.STRING) + .build(); + + RowSignature outputSignature = RowSignature.builder() + .add("d3", ColumnType.STRING) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("j0.unnest", "d3") + ) + ); + + testSelectQuery() + .setSql("SELECT d3 FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE1), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + resultSignature + )) + .columns(ImmutableList.of("j0.unnest")) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows( + useDefault ? ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + ) : ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{null}, + new Object[]{null} + )) + .verifyResults(); + } + + @Test + public void testSelectUnnestOnQueryFoo() + { + RowSignature resultSignature = RowSignature.builder() + .add("j0.unnest", ColumnType.STRING) + .build(); + + RowSignature resultSignature1 = RowSignature.builder() + .add("dim3", ColumnType.STRING) + .build(); + + RowSignature outputSignature = RowSignature.builder() + .add("d3", ColumnType.STRING) + .build(); + + final ColumnMappings expectedColumnMappings = new ColumnMappings( + ImmutableList.of( + new ColumnMapping("j0.unnest", "d3") + ) + ); + + testSelectQuery() + .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a' LIMIT 10), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + new TableDataSource(CalciteTests.DATASOURCE1) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .filters(equality("dim2", "a", ColumnType.STRING)) + .columns("dim3") + .context(defaultScanQueryContext( + context, + resultSignature1 + )) + .limit(10) + .build() + ), + expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING), + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + resultSignature + )) + .columns(ImmutableList.of("j0.unnest")) + .build()) + .columnMappings(expectedColumnMappings) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(outputSignature) + .setQueryContext(context) + .setExpectedResultRows( + useDefault ? ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"} + ) : ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""} + )) + .verifyResults(); + } + @Nonnull private List expectedMultiValueFooRowsGroup() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java new file mode 100644 index 00000000000..16aef6bdcaa --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.msq.exec.WorkerMemoryParameters; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.sql.calcite.CalciteArraysQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.junit.After; +import org.junit.Before; + +/** + * Runs {@link CalciteArraysQueryTest} but with MSQ engine + */ +public class CalciteArraysSelectQueryMSQTest extends CalciteArraysQueryTest +{ + private TestGroupByBuffers groupByBuffers; + + @Before + public void setup2() + { + groupByBuffers = TestGroupByBuffers.createDefault(); + } + + @After + public void teardown2() + { + groupByBuffers.close(); + } + + @Override + public void configureGuice(DruidInjectorBuilder builder) + { + super.configureGuice(builder); + builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0])); + } + + + @Override + public SqlEngine createEngine( + QueryLifecycleFactory qlf, + ObjectMapper queryJsonMapper, + Injector injector + ) + { + final WorkerMemoryParameters workerMemoryParameters = + WorkerMemoryParameters.createInstance( + WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50, + 2, + 10, + 2, + 0, + 0 + ); + final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient( + queryJsonMapper, + injector, + new MSQTestTaskActionClient(queryJsonMapper), + workerMemoryParameters + ); + return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper); + } + + @Override + protected QueryTestBuilder testBuilder() + { + return new QueryTestBuilder(new CalciteTestConfig(true)) + .addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient())) + .skipVectorize(true) + .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate()) + .msqCompatible(msqCompatible); + } +} + diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 0c840b14a88..d7c0ea1f2d5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -89,10 +89,13 @@ import java.util.function.Supplier; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; +import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; +import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2; +import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS; /** * Helper class aiding in wiring up the Guice bindings required for MSQ engine to work with the Calcite's tests @@ -246,6 +249,15 @@ public class CalciteMSQTestsHelper .rows(ROWS1_WITH_NUMERIC_DIMS) .buildMMappedIndex(); break; + case DATASOURCE5: + index = IndexBuilder + .create() + .tmpDir(new File(temporaryFolder.newFolder(), "5")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(INDEX_SCHEMA_LOTS_O_COLUMNS) + .rows(ROWS_LOTS_OF_COLUMNS) + .buildMMappedIndex(); + break; default: throw new ISE("Cannot query segment %s in test runner", segmentId); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index a6f98b3ba85..a478d1c3c17 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -119,7 +119,8 @@ public class MSQTestWorkerContext implements WorkerContext IndexMergerV9 indexMerger = new IndexMergerV9( mapper, indexIO, - OffHeapMemorySegmentWriteOutMediumFactory.instance() + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true ); final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index d72b577ef3f..9141c4db090 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -82,11 +82,6 @@ public class PlannerContext */ public static final String CTX_ENABLE_WINDOW_FNS = "windowsAreForClosers"; - /** - * Undocumented context key, used to enable {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#UNNEST}. - */ - public static final String CTX_ENABLE_UNNEST = "enableUnnest"; - public static final String CTX_SQL_USE_BOUNDS_AND_SELECTORS = "sqlUseBoundAndSelectors"; public static final boolean DEFAULT_SQL_USE_BOUNDS_AND_SELECTORS = NullHandling.replaceWithDefault(); @@ -527,13 +522,6 @@ public class PlannerContext // Short-circuit: feature requires context flag. return false; } - - if (feature == EngineFeature.UNNEST && - !QueryContexts.getAsBoolean(CTX_ENABLE_UNNEST, queryContext.get(CTX_ENABLE_UNNEST), false)) { - // Short-circuit: feature requires context flag. - return false; - } - return engine.featureAvailable(feature, this); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 526eb6a976c..8ca4ab076d9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -115,6 +115,7 @@ public class DruidRules retVal.add(DruidOuterQueryRule.WINDOW); } + // Adding unnest specific rules if (plannerContext.featureAvailable(EngineFeature.UNNEST)) { retVal.add(new DruidUnnestRule(plannerContext)); retVal.add(new DruidCorrelateUnnestRule(plannerContext)); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 47dae5c4d97..cd719d7f29f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -64,7 +64,6 @@ public class ViewSqlEngine implements SqlEngine case WINDOW_FUNCTIONS: case UNNEST: return true; - // Views can't sit on top of INSERT or REPLACE. case CAN_INSERT: case CAN_REPLACE: diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index e72537c7da5..84fd4217c75 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -203,7 +203,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) - .put(PlannerContext.CTX_ENABLE_UNNEST, true) .build(); public static final Map QUERY_CONTEXT_NO_STRINGIFY_ARRAY_USE_EQUALITY = diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index df4e9b62cc9..ae4437faf69 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -60,7 +60,6 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.util.CalciteTests; import org.junit.Assert; import org.junit.Test; @@ -78,7 +77,6 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest private static final Map QUERY_CONTEXT_UNNEST = ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) - .put(PlannerContext.CTX_ENABLE_UNNEST, true) .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) .build(); @@ -87,6 +85,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testSelectConstantArrayExpressionFromTable() { + notMsqCompatible(); testQuery( "SELECT ARRAY[1,2] as arr, dim1 FROM foo LIMIT 1", ImmutableList.of( @@ -168,6 +167,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testSelectNonConstantArrayExpressionFromTableForMultival() { + notMsqCompatible(); final String sql = "SELECT ARRAY[CONCAT(dim3, 'word'),'up'] as arr, dim1 FROM foo LIMIT 5"; final Query scanQuery = newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -207,6 +207,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest // Yes these outputs are strange sometimes, arrays are in a partial state of existence so end up a bit // stringy for now this is because virtual column selectors are coercing values back to stringish so that // multi-valued string dimensions can be grouped on. + notMsqCompatible(); List expectedResults; if (useDefault) { expectedResults = ImmutableList.of( @@ -386,6 +387,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest // which will still always be stringified to ultimately adhere to the varchar type // as array support increases in the engine this will likely change since using explict array functions should // probably kick it into an array + notMsqCompatible(); List expectedResults; if (useDefault) { expectedResults = ImmutableList.of( @@ -1017,6 +1019,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayGroupAsLongArray() { + notMsqCompatible(); // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays cannotVectorize(); testQuery( @@ -1068,6 +1071,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest { // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays as keys cannotVectorize(); + notMsqCompatible(); testQuery( "SELECT ARRAY[d1], SUM(cnt) FROM druid.numfoo GROUP BY 1 ORDER BY 2 DESC", QUERY_CONTEXT_NO_STRINGIFY_ARRAY, @@ -1115,6 +1119,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayGroupAsFloatArray() { + notMsqCompatible(); // Cannot vectorize as we donot have support in native query subsytem for grouping on arrays as keys cannotVectorize(); testQuery( @@ -1605,6 +1610,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayAggNumeric() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_AGG(l1), ARRAY_AGG(DISTINCT l1), ARRAY_AGG(d1), ARRAY_AGG(DISTINCT d1), ARRAY_AGG(f1), ARRAY_AGG(DISTINCT f1) FROM numfoo", @@ -1741,6 +1747,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayAggQuantile() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_QUANTILE(ARRAY_AGG(l1), 0.9) FROM numfoo", @@ -1784,6 +1791,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayAggArrays() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_AGG(ARRAY[l1, l2]), ARRAY_AGG(DISTINCT ARRAY[l1, l2]) FROM numfoo", @@ -1880,6 +1888,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayConcatAggArrays() { + notMsqCompatible(); cannotVectorize(); testQuery( "SELECT ARRAY_CONCAT_AGG(ARRAY[l1, l2]), ARRAY_CONCAT_AGG(DISTINCT ARRAY[l1, l2]) FROM numfoo", @@ -2028,6 +2037,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest public void testArrayAggMaxBytes() { cannotVectorize(); + notMsqCompatible(); testQuery( "SELECT ARRAY_AGG(l1, 128), ARRAY_AGG(DISTINCT l1, 128) FROM numfoo", ImmutableList.of( @@ -2227,6 +2237,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayAggGroupByArrayAggOfLongsFromSubquery() { + notMsqCompatible(); requireMergeBuffers(3); cannotVectorize(); testQuery( @@ -2366,6 +2377,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testArrayAggGroupByArrayAggOfDoubleFromSubquery() { + notMsqCompatible(); requireMergeBuffers(3); cannotVectorize(); testQuery( @@ -2883,6 +2895,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testUnnestThriceWithFiltersOnDimAndUnnestCol() { + notMsqCompatible(); cannotVectorize(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" @@ -2981,6 +2994,7 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest @Test public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumns() { + notMsqCompatible(); cannotVectorize(); String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 FROM \n" + " ( SELECT * FROM \n" diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 272fddbd8a4..46fb40fddad 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -83,6 +83,7 @@ public class IngestionTestSqlEngine implements SqlEngine case TOPN_QUERY: case TIME_BOUNDARY_QUERY: case SCAN_NEEDS_SIGNATURE: + case UNNEST: return false; case CAN_INSERT: case CAN_REPLACE: diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java index 52e52ec7f89..48e7ee2423b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java @@ -42,6 +42,7 @@ import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider; +import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; import org.apache.druid.sql.calcite.schema.DruidSchemaName; import org.apache.druid.sql.calcite.schema.NamedSchema; @@ -89,6 +90,8 @@ public class CalcitePlannerModuleTest extends CalciteTestBase @Mock private DruidSchemaCatalog rootSchema; + @Mock + private SqlEngine engine; private Set aggregators; private Set operatorConversions; @@ -185,13 +188,16 @@ public class CalcitePlannerModuleTest extends CalciteTestBase CalciteTests.TEST_AUTHORIZER_MAPPER, AuthConfig.newBuilder().build() ); + + PlannerContext context = PlannerContext.create( toolbox, "SELECT 1", - null, + engine, Collections.emptyMap(), null ); + boolean containsCustomRule = injector.getInstance(CalciteRulesManager.class) .druidConventionRuleSet(context) .contains(customRule); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index b2f93340dbf..c6f05697026 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -222,7 +222,7 @@ public class TestDataBuilder .withRollup(false) .build(); - private static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new IncrementalIndexSchema.Builder() + public static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new IncrementalIndexSchema.Builder() .withMetrics( new CountAggregatorFactory("count") )