Refactor WindowOperatorQueryKit to use WindowStage class for representing different window stages (#17158)

This commit is contained in:
Akshat Jain 2024-11-12 14:18:16 +05:30 committed by GitHub
parent 8278a1f7df
commit c571e6905d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 313 additions and 231 deletions

View File

@ -33,6 +33,7 @@ import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
@ -50,7 +51,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
@ -72,12 +72,6 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
int minStageNumber
)
{
RowSignature rowSignature = originalQuery.getRowSignature();
log.info("Row signature received for query is [%s].", rowSignature);
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKitSpec,
originalQuery.context(),
@ -88,199 +82,348 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
minStageNumber,
false
);
final RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder()
.get()
.build()
.getFinalStageDefinition()
.getSignature();
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(
operatorList.get(0),
queryKitSpec.getNumPartitionsForShuffle()
final WindowStages windowStages = new WindowStages(
originalQuery,
jsonMapper,
queryKitSpec.getNumPartitionsForShuffle(),
queryKitSpec.getMaxNonLeafWorkerCount(),
resultShuffleSpecFactory,
signatureFromInput,
isOperatorTransformationEnabled
);
final QueryDefinitionBuilder queryDefBuilder =
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec);
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle());
final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec);
final int firstWindowStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
// Get segment granularity from query context, and create ShuffleSpec and RowSignature to be used for the final window stage.
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, queryToRun.getContext());
final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity);
final ShuffleSpec finalWindowStageShuffleSpec = resultShuffleSpecFactory.build(finalWindowClusterBy, false);
final RowSignature finalWindowStageRowSignature = computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, segmentGranularity);
final int maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context());
// There are multiple windows present in the query.
// Create stages for each window in the query.
// These stages will be serialized.
// The partition by clause of the next window will be the shuffle key for the previous window.
RowSignature.Builder bob = RowSignature.builder();
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
log.info("Row signature received from last stage is [%s].", signatureFromInput);
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
// Iterate over the list of window stages, and add the definition for each window stage to QueryDefinitionBuilder.
for (int i = 0; i < windowStages.getStages().size(); i++) {
queryDefBuilder.add(windowStages.getStageDefinitionBuilder(firstWindowStageNumber + i, i));
}
/*
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
to be used for a different window stage.
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
*/
for (int i = 0; i < operatorList.size(); i++) {
for (OperatorFactory operatorFactory : operatorList.get(i)) {
if (operatorFactory instanceof WindowOperatorFactory) {
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
// since they need to be present in the row signature for this window stage.
for (String columnName : outputColumnNames) {
int indexInRowSignature = rowSignature.indexOf(columnName);
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
bob.add(columnName, columnType);
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
} else {
throw new ISE(
"Found unexpected column [%s] already present in row signature [%s].",
columnName,
rowSignature
);
}
}
}
}
final RowSignature intermediateSignature = bob.build();
final RowSignature stageRowSignature;
if (i + 1 == operatorList.size()) {
stageRowSignature = finalWindowStageRowSignature;
nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle());
if (nextShuffleSpec == null) {
stageRowSignature = intermediateSignature;
} else {
stageRowSignature = QueryKitUtils.sortableSignature(
intermediateSignature,
nextShuffleSpec.clusterBy().getColumns()
);
}
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
final List<String> partitionColumnNames = operatorList.get(i)
.stream()
.filter(of -> of instanceof AbstractPartitioningOperatorFactory)
.map(of -> (AbstractPartitioningOperatorFactory) of)
.flatMap(of -> of.getPartitionColumns().stream())
.collect(Collectors.toList());
final List<OperatorFactory> operatorFactories = isOperatorTransformationEnabled
? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized)
: operatorList.get(i);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
.signature(stageRowSignature)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorFactories,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
))
);
}
return queryDefBuilder.build();
}
/**
*
* @param originalQuery
* @return A list of list of operator factories, where each list represents the operator factories for a particular
* window stage.
* Represents the window stages to be added to {@link QueryDefinitionBuilder}.
* This class is responsible for creating the window stages.
*/
private List<List<OperatorFactory>> getOperatorListFromQuery(WindowOperatorQuery originalQuery)
private static class WindowStages
{
List<List<OperatorFactory>> operatorList = new ArrayList<>();
final List<OperatorFactory> operators = originalQuery.getOperators();
List<OperatorFactory> currentStage = new ArrayList<>();
private final List<WindowStage> stages;
private final WindowOperatorQuery query;
private final int numPartitionsForShuffle;
private final int maxNonLeafWorkerCount;
private final ShuffleSpec finalWindowStageShuffleSpec;
private final RowSignature finalWindowStageRowSignature;
private final RowSignature.Builder rowSignatureBuilder;
private final boolean isOperatorTransformationEnabled;
for (int i = 0; i < operators.size(); i++) {
OperatorFactory of = operators.get(i);
currentStage.add(of);
private WindowStages(
WindowOperatorQuery query,
ObjectMapper jsonMapper,
int numPartitionsForShuffle,
int maxNonLeafWorkerCount,
ShuffleSpecFactory resultShuffleSpecFactory,
RowSignature signatureFromInput,
boolean isOperatorTransformationEnabled
)
{
this.stages = new ArrayList<>();
this.query = query;
this.numPartitionsForShuffle = numPartitionsForShuffle;
this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
if (of instanceof WindowOperatorFactory) {
// Process consecutive window operators
while (i + 1 < operators.size() && operators.get(i + 1) instanceof WindowOperatorFactory) {
i++;
currentStage.add(operators.get(i));
}
final Granularity segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(
jsonMapper,
query.getContext()
);
final ClusterBy finalWindowClusterBy = computeClusterByForFinalWindowStage(segmentGranularity);
this.finalWindowStageShuffleSpec = computeShuffleSpecForFinalWindowStage(
resultShuffleSpecFactory,
finalWindowClusterBy
);
this.finalWindowStageRowSignature = computeSignatureForFinalWindowStage(
query.getRowSignature(),
finalWindowClusterBy,
segmentGranularity
);
// Finalize the current stage
operatorList.add(new ArrayList<>(currentStage));
currentStage.clear();
}
this.rowSignatureBuilder = RowSignature.builder().addAll(signatureFromInput);
populateStages();
}
// There shouldn't be any operators left in currentStage. The last operator should always be WindowOperatorFactory.
if (!currentStage.isEmpty()) {
throw new ISE(
"Found unexpected operators [%s] present in the list of operators [%s].",
currentStage,
operators
private void populateStages()
{
WindowStage currentStage = new WindowStage(getMaxRowsMaterialized());
for (OperatorFactory of : query.getOperators()) {
if (!currentStage.canAccept(of)) {
stages.add(currentStage);
currentStage = new WindowStage(getMaxRowsMaterialized());
}
currentStage.addOperatorFactory(of);
}
if (!currentStage.getOperatorFactories().isEmpty()) {
stages.add(currentStage);
}
log.info("Created window stages: [%s]", stages);
}
private List<WindowStage> getStages()
{
return stages;
}
private RowSignature getRowSignatureForStage(int windowStageIndex, ShuffleSpec shuffleSpec)
{
if (windowStageIndex == stages.size() - 1) {
return finalWindowStageRowSignature;
}
final WindowStage stage = stages.get(windowStageIndex);
for (WindowOperatorFactory operatorFactory : stage.getWindowOperatorFactories()) {
for (String columnName : operatorFactory.getProcessor().getOutputColumnNames()) {
int indexInRowSignature = query.getRowSignature().indexOf(columnName);
if (indexInRowSignature != -1 && rowSignatureBuilder.build().indexOf(columnName) == -1) {
ColumnType columnType = query.getRowSignature().getColumnType(indexInRowSignature).get();
rowSignatureBuilder.add(columnName, columnType);
}
}
}
final RowSignature intermediateSignature = rowSignatureBuilder.build();
final RowSignature stageRowSignature;
if (shuffleSpec == null) {
stageRowSignature = intermediateSignature;
} else {
stageRowSignature = QueryKitUtils.sortableSignature(
intermediateSignature,
shuffleSpec.clusterBy().getColumns()
);
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
return stageRowSignature;
}
private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, int windowStageIndex)
{
final WindowStage stage = stages.get(windowStageIndex);
final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ?
finalWindowStageShuffleSpec :
stages.get(windowStageIndex + 1).findShuffleSpec(numPartitionsForShuffle);
final RowSignature stageRowSignature = getRowSignatureForStage(windowStageIndex, shuffleSpec);
final List<OperatorFactory> operatorFactories = isOperatorTransformationEnabled
? stage.getTransformedOperatorFactories()
: stage.getOperatorFactories();
return StageDefinition.builder(stageNumber)
.inputs(new StageInputSpec(stageNumber - 1))
.signature(stageRowSignature)
.maxWorkerCount(maxNonLeafWorkerCount)
.shuffleSpec(shuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
query,
operatorFactories,
stageRowSignature,
getMaxRowsMaterialized(),
stage.getPartitionColumns()
));
}
/**
* Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account,
* as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}.
*/
private ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity)
{
final List<KeyColumn> clusterByColumns = Collections.singletonList(new KeyColumn(
QueryKitUtils.PARTITION_BOOST_COLUMN,
KeyOrder.ASCENDING
));
return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
}
/**
* Computes the signature for the final window stage. The finalWindowClusterBy will always have the
* partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}.
*/
private RowSignature computeSignatureForFinalWindowStage(
RowSignature rowSignature,
ClusterBy finalWindowClusterBy,
Granularity segmentGranularity
)
{
final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder()
.addAll(rowSignature)
.add(
QueryKitUtils.PARTITION_BOOST_COLUMN,
ColumnType.LONG
);
return QueryKitUtils.sortableSignature(
QueryKitUtils.signatureWithSegmentGranularity(
finalWindowStageRowSignatureBuilder.build(),
segmentGranularity
),
finalWindowClusterBy.getColumns()
);
}
return operatorList;
private ShuffleSpec computeShuffleSpecForFinalWindowStage(
ShuffleSpecFactory resultShuffleSpecFactory,
ClusterBy finalWindowClusterBy
)
{
return resultShuffleSpecFactory.build(finalWindowClusterBy, false);
}
private int getMaxRowsMaterialized()
{
return MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
}
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int partitionCount)
/**
* Represents a window stage in a query execution.
* Each stage can contain a sort operator, a partition operator, and multiple window operators.
*/
private static class WindowStage
{
AbstractPartitioningOperatorFactory partition = null;
AbstractSortOperatorFactory sort = null;
for (OperatorFactory of : operatorFactories) {
if (of instanceof AbstractPartitioningOperatorFactory) {
partition = (AbstractPartitioningOperatorFactory) of;
} else if (of instanceof AbstractSortOperatorFactory) {
sort = (AbstractSortOperatorFactory) of;
}
private AbstractSortOperatorFactory sortOperatorFactory;
private AbstractPartitioningOperatorFactory partitioningOperatorFactory;
private final List<WindowOperatorFactory> windowOperatorFactories;
private final int maxRowsMaterialized;
private WindowStage(int maxRowsMaterialized)
{
this.windowOperatorFactories = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterialized;
}
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
if (partition == null) {
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}
if (partition.getPartitionColumns().isEmpty()) {
return MixShuffleSpec.instance();
}
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
private void addOperatorFactory(OperatorFactory op)
{
if (op instanceof AbstractSortOperatorFactory) {
this.sortOperatorFactory = (AbstractSortOperatorFactory) op;
} else if (op instanceof AbstractPartitioningOperatorFactory) {
this.partitioningOperatorFactory = (AbstractPartitioningOperatorFactory) op;
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
this.windowOperatorFactories.add((WindowOperatorFactory) op);
}
keyColsOfWindow.add(kc);
}
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount);
private List<OperatorFactory> getOperatorFactories()
{
List<OperatorFactory> operatorFactories = new ArrayList<>();
if (sortOperatorFactory != null) {
operatorFactories.add(sortOperatorFactory);
}
if (partitioningOperatorFactory != null) {
operatorFactories.add(partitioningOperatorFactory);
}
operatorFactories.addAll(windowOperatorFactories);
return operatorFactories;
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @return
*/
private List<OperatorFactory> getTransformedOperatorFactories()
{
List<OperatorFactory> operatorFactories = new ArrayList<>();
if (partitioningOperatorFactory != null) {
operatorFactories.add(new GlueingPartitioningOperatorFactory(partitioningOperatorFactory.getPartitionColumns(), maxRowsMaterialized));
}
if (sortOperatorFactory != null) {
operatorFactories.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
}
operatorFactories.addAll(windowOperatorFactories);
return operatorFactories;
}
private List<WindowOperatorFactory> getWindowOperatorFactories()
{
return windowOperatorFactories;
}
private ShuffleSpec findShuffleSpec(int partitionCount)
{
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
if (sortOperatorFactory != null) {
for (ColumnWithDirection sortColumn : sortOperatorFactory.getSortColumns()) {
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
if (partitioningOperatorFactory == null) {
// If the window stage doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}
if (partitioningOperatorFactory.getPartitionColumns().isEmpty()) {
return MixShuffleSpec.instance();
}
final List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partitioningOperatorFactory.getPartitionColumns()) {
KeyColumn kc = new KeyColumn(
partitionColumn,
sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC
? KeyOrder.DESCENDING
: KeyOrder.ASCENDING
);
keyColsOfWindow.add(kc);
}
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount);
}
private boolean canAccept(OperatorFactory operatorFactory)
{
if (getOperatorFactories().isEmpty()) {
return true;
}
if (operatorFactory instanceof AbstractSortOperatorFactory) {
return false;
}
if (operatorFactory instanceof WindowOperatorFactory) {
return true;
}
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
return sortOperatorFactory != null;
}
throw new ISE("Encountered unexpected operatorFactory type: [%s]", operatorFactory.getClass().getName());
}
private List<String> getPartitionColumns()
{
return partitioningOperatorFactory == null ? new ArrayList<>() : partitioningOperatorFactory.getPartitionColumns();
}
@Override
public String toString()
{
return "WindowStage{" +
"sortOperatorFactory=" + sortOperatorFactory +
", partitioningOperatorFactory=" + partitioningOperatorFactory +
", windowOperatorFactories=" + windowOperatorFactories +
'}';
}
}
/**
@ -307,65 +450,4 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
}
return queryDefBuilder;
}
/**
* Computes the ClusterBy for the final window stage. We don't have to take the CLUSTERED BY columns into account,
* as they are handled as {@link org.apache.druid.query.scan.ScanQuery#orderBys}.
*/
private static ClusterBy computeClusterByForFinalWindowStage(Granularity segmentGranularity)
{
final List<KeyColumn> clusterByColumns = Collections.singletonList(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
return QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
}
/**
* Computes the signature for the final window stage. The finalWindowClusterBy will always have the
* partition boost column as computed in {@link #computeClusterByForFinalWindowStage(Granularity)}.
*/
private static RowSignature computeSignatureForFinalWindowStage(RowSignature rowSignature, ClusterBy finalWindowClusterBy, Granularity segmentGranularity)
{
final RowSignature.Builder finalWindowStageRowSignatureBuilder = RowSignature.builder()
.addAll(rowSignature)
.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
return QueryKitUtils.sortableSignature(
QueryKitUtils.signatureWithSegmentGranularity(finalWindowStageRowSignatureBuilder.build(), segmentGranularity),
finalWindowClusterBy.getColumns()
);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getTransformedOperatorFactoryListForStageDefinition(
List<OperatorFactory> operatorFactoryListFromQuery,
int maxRowsMaterializedInWindow
)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}