From c571e6905d035730e8158638202b7c6dc218effb Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 12 Nov 2024 14:18:16 +0530 Subject: [PATCH] Refactor WindowOperatorQueryKit to use WindowStage class for representing different window stages (#17158) --- .../msq/querykit/WindowOperatorQueryKit.java | 544 ++++++++++-------- 1 file changed, 313 insertions(+), 231 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index a46f62866a1..ca9cacef7bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -33,6 +33,7 @@ import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; import org.apache.druid.query.operator.AbstractSortOperatorFactory; @@ -50,7 +51,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class WindowOperatorQueryKit implements QueryKit { @@ -72,12 +72,6 @@ public class WindowOperatorQueryKit implements QueryKit int minStageNumber ) { - RowSignature rowSignature = originalQuery.getRowSignature(); - log.info("Row signature received for query is [%s].", rowSignature); - - List> operatorList = getOperatorListFromQuery(originalQuery); - log.info("Created operatorList with operator factories: [%s]", operatorList); - final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( queryKitSpec, originalQuery.context(), @@ -88,199 +82,348 @@ public class WindowOperatorQueryKit implements QueryKit minStageNumber, false ); + final RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder() + .get() + .build() + .getFinalStageDefinition() + .getSignature(); - ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( - operatorList.get(0), - queryKitSpec.getNumPartitionsForShuffle() + final WindowStages windowStages = new WindowStages( + originalQuery, + jsonMapper, + queryKitSpec.getNumPartitionsForShuffle(), + queryKitSpec.getMaxNonLeafWorkerCount(), + resultShuffleSpecFactory, + signatureFromInput, + isOperatorTransformationEnabled ); - final QueryDefinitionBuilder queryDefBuilder = - makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); - final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); - final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); + final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); + final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); + final int firstWindowStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); - // Get segment granularity from query context, and create ShuffleSpec and RowSignature to be used for the final window stage. - final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext()); - final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity); - final ShuffleSpec finalWindowStageShuffleSpec = resultShuffleSpecFactory.build(finalWindowClusterBy, false); - final RowSignature finalWindowStageRowSignature = computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, segmentGranularity); - - final int maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context()); - - // There are multiple windows present in the query. - // Create stages for each window in the query. - // These stages will be serialized. - // The partition by clause of the next window will be the shuffle key for the previous window. - RowSignature.Builder bob = RowSignature.builder(); - RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); log.info("Row signature received from last stage is [%s].", signatureFromInput); - for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { - bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); + // Iterate over the list of window stages, and add the definition for each window stage to QueryDefinitionBuilder. + for (int i = 0; i < windowStages.getStages().size(); i++) { + queryDefBuilder.add(windowStages.getStageDefinitionBuilder(firstWindowStageNumber + i, i)); } - - /* - operatorList is a List>, where each List corresponds to the operator factories - to be used for a different window stage. - - We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. - */ - for (int i = 0; i < operatorList.size(); i++) { - for (OperatorFactory operatorFactory : operatorList.get(i)) { - if (operatorFactory instanceof WindowOperatorFactory) { - List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); - - // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, - // since they need to be present in the row signature for this window stage. - for (String columnName : outputColumnNames) { - int indexInRowSignature = rowSignature.indexOf(columnName); - if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { - ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); - bob.add(columnName, columnType); - log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); - } else { - throw new ISE( - "Found unexpected column [%s] already present in row signature [%s].", - columnName, - rowSignature - ); - } - } - } - } - - final RowSignature intermediateSignature = bob.build(); - final RowSignature stageRowSignature; - - if (i + 1 == operatorList.size()) { - stageRowSignature = finalWindowStageRowSignature; - nextShuffleSpec = finalWindowStageShuffleSpec; - } else { - nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); - if (nextShuffleSpec == null) { - stageRowSignature = intermediateSignature; - } else { - stageRowSignature = QueryKitUtils.sortableSignature( - intermediateSignature, - nextShuffleSpec.clusterBy().getColumns() - ); - } - } - - log.info("Using row signature [%s] for window stage.", stageRowSignature); - - final List partitionColumnNames = operatorList.get(i) - .stream() - .filter(of -> of instanceof AbstractPartitioningOperatorFactory) - .map(of -> (AbstractPartitioningOperatorFactory) of) - .flatMap(of -> of.getPartitionColumns().stream()) - .collect(Collectors.toList()); - - final List operatorFactories = isOperatorTransformationEnabled - ? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized) - : operatorList.get(i); - - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + i) - .inputs(new StageInputSpec(firstStageNumber + i - 1)) - .signature(stageRowSignature) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) - .shuffleSpec(nextShuffleSpec) - .processorFactory(new WindowOperatorQueryFrameProcessorFactory( - queryToRun, - operatorFactories, - stageRowSignature, - maxRowsMaterialized, - partitionColumnNames - )) - ); - } - return queryDefBuilder.build(); } /** - * - * @param originalQuery - * @return A list of list of operator factories, where each list represents the operator factories for a particular - * window stage. + * Represents the window stages to be added to {@link QueryDefinitionBuilder}. + * This class is responsible for creating the window stages. */ - private List> getOperatorListFromQuery(WindowOperatorQuery originalQuery) + private static class WindowStages { - List> operatorList = new ArrayList<>(); - final List operators = originalQuery.getOperators(); - List currentStage = new ArrayList<>(); + private final List stages; + private final WindowOperatorQuery query; + private final int numPartitionsForShuffle; + private final int maxNonLeafWorkerCount; + private final ShuffleSpec finalWindowStageShuffleSpec; + private final RowSignature finalWindowStageRowSignature; + private final RowSignature.Builder rowSignatureBuilder; + private final boolean isOperatorTransformationEnabled; - for (int i = 0; i < operators.size(); i++) { - OperatorFactory of = operators.get(i); - currentStage.add(of); + private WindowStages( + WindowOperatorQuery query, + ObjectMapper jsonMapper, + int numPartitionsForShuffle, + int maxNonLeafWorkerCount, + ShuffleSpecFactory resultShuffleSpecFactory, + RowSignature signatureFromInput, + boolean isOperatorTransformationEnabled + ) + { + this.stages = new ArrayList<>(); + this.query = query; + this.numPartitionsForShuffle = numPartitionsForShuffle; + this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; + this.isOperatorTransformationEnabled = isOperatorTransformationEnabled; - if (of instanceof WindowOperatorFactory) { - // Process consecutive window operators - while (i + 1 < operators.size() && operators.get(i + 1) instanceof WindowOperatorFactory) { - i++; - currentStage.add(operators.get(i)); - } + final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext( + jsonMapper, + query.getContext() + ); + final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity); + this.finalWindowStageShuffleSpec = computeShuffleSpecForFinalWindowStage( + resultShuffleSpecFactory, + finalWindowClusterBy + ); + this.finalWindowStageRowSignature = computeSignatureForFinalWindowStage( + query.getRowSignature(), + finalWindowClusterBy, + segmentGranularity + ); - // Finalize the current stage - operatorList.add(new ArrayList<>(currentStage)); - currentStage.clear(); - } + this.rowSignatureBuilder = RowSignature.builder().addAll(signatureFromInput); + populateStages(); } - // There shouldn't be any operators left in currentStage. The last operator should always be WindowOperatorFactory. - if (!currentStage.isEmpty()) { - throw new ISE( - "Found unexpected operators [%s] present in the list of operators [%s].", - currentStage, - operators + private void populateStages() + { + WindowStage currentStage = new WindowStage(getMaxRowsMaterialized()); + for (OperatorFactory of : query.getOperators()) { + if (!currentStage.canAccept(of)) { + stages.add(currentStage); + currentStage = new WindowStage(getMaxRowsMaterialized()); + } + currentStage.addOperatorFactory(of); + } + if (!currentStage.getOperatorFactories().isEmpty()) { + stages.add(currentStage); + } + + log.info("Created window stages: [%s]", stages); + } + + private List getStages() + { + return stages; + } + + private RowSignature getRowSignatureForStage(int windowStageIndex, ShuffleSpec shuffleSpec) + { + if (windowStageIndex == stages.size() - 1) { + return finalWindowStageRowSignature; + } + + final WindowStage stage = stages.get(windowStageIndex); + for (WindowOperatorFactory operatorFactory : stage.getWindowOperatorFactories()) { + for (String columnName : operatorFactory.getProcessor().getOutputColumnNames()) { + int indexInRowSignature = query.getRowSignature().indexOf(columnName); + if (indexInRowSignature != -1 && rowSignatureBuilder.build().indexOf(columnName) == -1) { + ColumnType columnType = query.getRowSignature().getColumnType(indexInRowSignature).get(); + rowSignatureBuilder.add(columnName, columnType); + } + } + } + + final RowSignature intermediateSignature = rowSignatureBuilder.build(); + + final RowSignature stageRowSignature; + if (shuffleSpec == null) { + stageRowSignature = intermediateSignature; + } else { + stageRowSignature = QueryKitUtils.sortableSignature( + intermediateSignature, + shuffleSpec.clusterBy().getColumns() + ); + } + + log.info("Using row signature [%s] for window stage.", stageRowSignature); + return stageRowSignature; + } + + private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, int windowStageIndex) + { + final WindowStage stage = stages.get(windowStageIndex); + final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ? + finalWindowStageShuffleSpec : + stages.get(windowStageIndex + 1).findShuffleSpec(numPartitionsForShuffle); + + final RowSignature stageRowSignature = getRowSignatureForStage(windowStageIndex, shuffleSpec); + final List operatorFactories = isOperatorTransformationEnabled + ? stage.getTransformedOperatorFactories() + : stage.getOperatorFactories(); + + return StageDefinition.builder(stageNumber) + .inputs(new StageInputSpec(stageNumber - 1)) + .signature(stageRowSignature) + .maxWorkerCount(maxNonLeafWorkerCount) + .shuffleSpec(shuffleSpec) + .processorFactory(new WindowOperatorQueryFrameProcessorFactory( + query, + operatorFactories, + stageRowSignature, + getMaxRowsMaterialized(), + stage.getPartitionColumns() + )); + } + + /** + * Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account, + * as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}. + */ + private ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity) + { + final List clusterByColumns = Collections.singletonList(new KeyColumn( + QueryKitUtils.PARTITION_BOOST_COLUMN, + KeyOrder.ASCENDING + )); + return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); + } + + /** + * Computes the signature for the final window stage. The finalWindowClusterBy will always have the + * partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}. + */ + private RowSignature computeSignatureForFinalWindowStage( + RowSignature rowSignature, + ClusterBy finalWindowClusterBy, + Granularity segmentGranularity + ) + { + final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder() + .addAll(rowSignature) + .add( + QueryKitUtils.PARTITION_BOOST_COLUMN, + ColumnType.LONG + ); + return QueryKitUtils.sortableSignature( + QueryKitUtils.signatureWithSegmentGranularity( + finalWindowStageRowSignatureBuilder.build(), + segmentGranularity + ), + finalWindowClusterBy.getColumns() ); } - return operatorList; + private ShuffleSpec computeShuffleSpecForFinalWindowStage( + ShuffleSpecFactory resultShuffleSpecFactory, + ClusterBy finalWindowClusterBy + ) + { + return resultShuffleSpecFactory.build(finalWindowClusterBy, false); + } + + private int getMaxRowsMaterialized() + { + return MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context()); + } } - private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) + /** + * Represents a window stage in a query execution. + * Each stage can contain a sort operator, a partition operator, and multiple window operators. + */ + private static class WindowStage { - AbstractPartitioningOperatorFactory partition = null; - AbstractSortOperatorFactory sort = null; - for (OperatorFactory of : operatorFactories) { - if (of instanceof AbstractPartitioningOperatorFactory) { - partition = (AbstractPartitioningOperatorFactory) of; - } else if (of instanceof AbstractSortOperatorFactory) { - sort = (AbstractSortOperatorFactory) of; - } + private AbstractSortOperatorFactory sortOperatorFactory; + private AbstractPartitioningOperatorFactory partitioningOperatorFactory; + private final List windowOperatorFactories; + private final int maxRowsMaterialized; + + private WindowStage(int maxRowsMaterialized) + { + this.windowOperatorFactories = new ArrayList<>(); + this.maxRowsMaterialized = maxRowsMaterialized; } - Map sortColumnsMap = new HashMap<>(); - if (sort != null) { - for (ColumnWithDirection sortColumn : sort.getSortColumns()) { - sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); - } - } - - if (partition == null) { - // If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. - // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. - return null; - } - - if (partition.getPartitionColumns().isEmpty()) { - return MixShuffleSpec.instance(); - } - - List keyColsOfWindow = new ArrayList<>(); - for (String partitionColumn : partition.getPartitionColumns()) { - KeyColumn kc; - if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { - kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); + private void addOperatorFactory(OperatorFactory op) + { + if (op instanceof AbstractSortOperatorFactory) { + this.sortOperatorFactory = (AbstractSortOperatorFactory) op; + } else if (op instanceof AbstractPartitioningOperatorFactory) { + this.partitioningOperatorFactory = (AbstractPartitioningOperatorFactory) op; } else { - kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); + this.windowOperatorFactories.add((WindowOperatorFactory) op); } - keyColsOfWindow.add(kc); } - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); + private List getOperatorFactories() + { + List operatorFactories = new ArrayList<>(); + if (sortOperatorFactory != null) { + operatorFactories.add(sortOperatorFactory); + } + if (partitioningOperatorFactory != null) { + operatorFactories.add(partitioningOperatorFactory); + } + operatorFactories.addAll(windowOperatorFactories); + return operatorFactories; + } + + /** + * This method converts the operator chain received from native plan into MSQ plan. + * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). + * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. + * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. + * @return + */ + private List getTransformedOperatorFactories() + { + List operatorFactories = new ArrayList<>(); + if (partitioningOperatorFactory != null) { + operatorFactories.add(new GlueingPartitioningOperatorFactory(partitioningOperatorFactory.getPartitionColumns(), maxRowsMaterialized)); + } + if (sortOperatorFactory != null) { + operatorFactories.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); + } + operatorFactories.addAll(windowOperatorFactories); + return operatorFactories; + } + + private List getWindowOperatorFactories() + { + return windowOperatorFactories; + } + + private ShuffleSpec findShuffleSpec(int partitionCount) + { + Map sortColumnsMap = new HashMap<>(); + if (sortOperatorFactory != null) { + for (ColumnWithDirection sortColumn : sortOperatorFactory.getSortColumns()) { + sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); + } + } + + if (partitioningOperatorFactory == null) { + // If the window stage doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. + // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. + return null; + } + + if (partitioningOperatorFactory.getPartitionColumns().isEmpty()) { + return MixShuffleSpec.instance(); + } + + final List keyColsOfWindow = new ArrayList<>(); + for (String partitionColumn : partitioningOperatorFactory.getPartitionColumns()) { + KeyColumn kc = new KeyColumn( + partitionColumn, + sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC + ? KeyOrder.DESCENDING + : KeyOrder.ASCENDING + ); + keyColsOfWindow.add(kc); + } + + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); + } + + private boolean canAccept(OperatorFactory operatorFactory) + { + if (getOperatorFactories().isEmpty()) { + return true; + } + if (operatorFactory instanceof AbstractSortOperatorFactory) { + return false; + } + if (operatorFactory instanceof WindowOperatorFactory) { + return true; + } + if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { + return sortOperatorFactory != null; + } + throw new ISE("Encountered unexpected operatorFactory type: [%s]", operatorFactory.getClass().getName()); + } + + private List getPartitionColumns() + { + return partitioningOperatorFactory == null ? new ArrayList<>() : partitioningOperatorFactory.getPartitionColumns(); + } + + @Override + public String toString() + { + return "WindowStage{" + + "sortOperatorFactory=" + sortOperatorFactory + + ", partitioningOperatorFactory=" + partitioningOperatorFactory + + ", windowOperatorFactories=" + windowOperatorFactories + + '}'; + } } /** @@ -307,65 +450,4 @@ public class WindowOperatorQueryKit implements QueryKit } return queryDefBuilder; } - - /** - * Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account, - * as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}. - */ - private static ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity) - { - final List clusterByColumns = Collections.singletonList(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); - return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); - } - - /** - * Computes the signature for the final window stage. The finalWindowClusterBy will always have the - * partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}. - */ - private static RowSignature computeSignatureForFinalWindowStage(RowSignature rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity) - { - final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder() - .addAll(rowSignature) - .add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); - return QueryKitUtils.sortableSignature( - QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(), segmentGranularity), - finalWindowClusterBy.getColumns() - ); - } - - /** - * This method converts the operator chain received from native plan into MSQ plan. - * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). - * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. - * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. - * @param operatorFactoryListFromQuery - * @param maxRowsMaterializedInWindow - * @return - */ - private List getTransformedOperatorFactoryListForStageDefinition( - List operatorFactoryListFromQuery, - int maxRowsMaterializedInWindow - ) - { - final List operatorFactoryList = new ArrayList<>(); - final List sortOperatorFactoryList = new ArrayList<>(); - for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { - if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { - AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; - operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow)); - } else if (operatorFactory instanceof AbstractSortOperatorFactory) { - AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; - sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); - } else { - // Add all the PartitionSortOperator(s) before every window operator. - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - operatorFactoryList.add(operatorFactory); - } - } - - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - return operatorFactoryList; - } }