diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 0615ba802bf..5c491c0780d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -571,7 +571,7 @@ public class ControllerImpl implements Controller final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( - context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), + context.makeQueryKitSpec(makeQueryControllerToolKit(queryContext), queryId, querySpec, queryKernelConfig), querySpec, context, resultsContext @@ -1211,13 +1211,19 @@ public class ControllerImpl implements Controller } @SuppressWarnings("rawtypes") - private QueryKit> makeQueryControllerToolKit() + private QueryKit> makeQueryControllerToolKit(QueryContext queryContext) { final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() .put(ScanQuery.class, new ScanQueryKit(context.jsonMapper())) .put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper())) - .put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper())) + .put( + WindowOperatorQuery.class, + new WindowOperatorQueryKit( + context.jsonMapper(), + MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext) + ) + ) .build(); return new MultiQueryKit(kitMap); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 41c1df884f9..674e3ea4602 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -40,13 +40,9 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; -import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; -import org.apache.druid.query.operator.AbstractSortOperatorFactory; -import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; -import org.apache.druid.query.operator.PartitionSortOperatorFactory; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; @@ -101,7 +97,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor this.frameWriterFactory = frameWriterFactory; this.resultRowAndCols = new ArrayList<>(); this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext); - this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList); + this.operatorFactoryList = operatorFactoryList; this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized); this.frameReader = frameReader; @@ -403,36 +399,4 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor resultRowAndCols.clear(); rowId.set(0); } - - /** - * This method converts the operator chain received from native plan into MSQ plan. - * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). - * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. - * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. - * @param operatorFactoryListFromQuery - * @return - */ - private List getOperatorFactoryListForStageDefinition(List operatorFactoryListFromQuery) - { - final List operatorFactoryList = new ArrayList<>(); - final List sortOperatorFactoryList = new ArrayList<>(); - for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { - if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { - AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; - operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized)); - } else if (operatorFactory instanceof AbstractSortOperatorFactory) { - AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; - sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); - } else { - // Add all the PartitionSortOperator(s) before every window operator. - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - operatorFactoryList.add(operatorFactory); - } - } - - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - return operatorFactoryList; - } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b168860ff4c..a46f62866a1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -37,7 +37,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; import org.apache.druid.query.operator.AbstractSortOperatorFactory; import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.PartitionSortOperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.operator.window.WindowOperatorFactory; import org.apache.druid.segment.column.ColumnType; @@ -54,10 +56,12 @@ public class WindowOperatorQueryKit implements QueryKit { private static final Logger log = new Logger(WindowOperatorQueryKit.class); private final ObjectMapper jsonMapper; + private final boolean isOperatorTransformationEnabled; - public WindowOperatorQueryKit(ObjectMapper jsonMapper) + public WindowOperatorQueryKit(ObjectMapper jsonMapper, boolean isOperatorTransformationEnabled) { this.jsonMapper = jsonMapper; + this.isOperatorTransformationEnabled = isOperatorTransformationEnabled; } @Override @@ -172,6 +176,9 @@ public class WindowOperatorQueryKit implements QueryKit .flatMap(of -> of.getPartitionColumns().stream()) .collect(Collectors.toList()); + final List operatorFactories = isOperatorTransformationEnabled + ? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized) + : operatorList.get(i); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + i) @@ -181,7 +188,7 @@ public class WindowOperatorQueryKit implements QueryKit .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, - operatorList.get(i), + operatorFactories, stageRowSignature, maxRowsMaterialized, partitionColumnNames @@ -325,4 +332,40 @@ public class WindowOperatorQueryKit implements QueryKit finalWindowClusterBy.getColumns() ); } + + /** + * This method converts the operator chain received from native plan into MSQ plan. + * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). + * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. + * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. + * @param operatorFactoryListFromQuery + * @param maxRowsMaterializedInWindow + * @return + */ + private List getTransformedOperatorFactoryListForStageDefinition( + List operatorFactoryListFromQuery, + int maxRowsMaterializedInWindow + ) + { + final List operatorFactoryList = new ArrayList<>(); + final List sortOperatorFactoryList = new ArrayList<>(); + for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { + if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { + AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; + operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow)); + } else if (operatorFactory instanceof AbstractSortOperatorFactory) { + AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; + sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); + } else { + // Add all the PartitionSortOperator(s) before every window operator. + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + operatorFactoryList.add(operatorFactory); + } + } + + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + return operatorFactoryList; + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 7cf8201c525..5462b991737 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -289,6 +289,9 @@ public class MSQTaskQueryMaker implements QueryMaker // Add appropriate finalization to native query context. nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, finalizeAggregations); + // This flag is to ensure backward compatibility, as brokers are upgraded after indexers/middlemanagers. + nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true); + final MSQSpec querySpec = MSQSpec.builder() .query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides)) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 9059a81ffe3..7112a101c04 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -191,6 +191,9 @@ public class MultiStageQueryContext public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow"; + // This flag ensures backward compatibility and will be removed in Druid 33, with the default behavior as enabled. + public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION = "windowFunctionOperatorTransformation"; + public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; /** @@ -217,6 +220,14 @@ public class MultiStageQueryContext ); } + public static boolean isWindowFunctionOperatorTransformationEnabled(final QueryContext queryContext) + { + return queryContext.getBoolean( + WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, + false + ); + } + public static int getMaxConcurrentStagesWithDefault( final QueryContext queryContext, final int defaultMaxConcurrentStages diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 62496abacf6..e99b571b807 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2) .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0) .put(MSQTaskQueryMaker.USER_KEY, "allowAll") + .put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true) .build(); public static final Map DURABLE_STORAGE_MSQ_CONTEXT = diff --git a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq index 502885fb3ae..3a2208d0361 100644 --- a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq +++ b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/msq1.iq @@ -95,7 +95,8 @@ order by 1; "maxParseExceptions" : 0, "plannerStrategy" : "DECOUPLED", "sqlQueryId" : __SQL_QUERY_ID__ - "sqlStringifyArrays" : false + "sqlStringifyArrays" : false, + "windowFunctionOperatorTransformation" : true } } }, @@ -201,7 +202,8 @@ order by 1; "maxParseExceptions" : 0, "plannerStrategy" : "DECOUPLED", "sqlQueryId" : __SQL_QUERY_ID__ - "sqlStringifyArrays" : false + "sqlStringifyArrays" : false, + "windowFunctionOperatorTransformation" : true } } },