From 63c91ad813eeac8ce3a596bf49c4eb885e220f63 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 30 Oct 2024 11:32:02 +0530 Subject: [PATCH] Fix backward compatibility issues in WindowOperatorQueryFrameProcessorFactory and WindowOperatorQueryFrameProcessor (#17433) --- .../WindowOperatorQueryFrameProcessor.java | 38 +++++++++++++- ...dowOperatorQueryFrameProcessorFactory.java | 34 +++++++++++-- .../msq/querykit/WindowOperatorQueryKit.java | 50 +++++-------------- ...peratorQueryFrameProcessorFactoryTest.java | 8 ++- 4 files changed, 87 insertions(+), 43 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 04cdab3b1fe..b3bcf899ec1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -39,9 +39,13 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; +import org.apache.druid.query.operator.AbstractSortOperatorFactory; +import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.PartitionSortOperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; @@ -95,9 +99,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor this.inputChannel = inputChannel; this.outputChannel = outputChannel; this.frameWriterFactory = frameWriterFactory; - this.operatorFactoryList = operatorFactoryList; this.resultRowAndCols = new ArrayList<>(); this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context()); + this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList); this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized); this.frameReader = frameReader; @@ -399,4 +403,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor resultRowAndCols.clear(); rowId.set(0); } + + /** + * This method converts the operator chain received from native plan into MSQ plan. + * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). + * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. + * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. + * @param operatorFactoryListFromQuery + * @return + */ + private List getOperatorFactoryListForStageDefinition(List operatorFactoryListFromQuery) + { + final List operatorFactoryList = new ArrayList<>(); + final List sortOperatorFactoryList = new ArrayList<>(); + for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { + if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { + AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; + operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized)); + } else if (operatorFactory instanceof AbstractSortOperatorFactory) { + AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; + sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); + } else { + // Add all the PartitionSortOperator(s) before every window operator. + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + operatorFactoryList.add(operatorFactory); + } + } + + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + return operatorFactoryList; + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index 2f97ffd74b4..68f6f564774 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.processor.OutputChannel; import org.apache.druid.frame.processor.OutputChannelFactory; @@ -59,17 +60,28 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor private final WindowOperatorQuery query; private final List operatorList; private final RowSignature stageRowSignature; + private final int maxRowsMaterializedInWindow; + private final List partitionColumnNames; @JsonCreator public WindowOperatorQueryFrameProcessorFactory( @JsonProperty("query") WindowOperatorQuery query, @JsonProperty("operatorList") List operatorFactoryList, - @JsonProperty("stageRowSignature") RowSignature stageRowSignature + @JsonProperty("stageRowSignature") RowSignature stageRowSignature, + @Deprecated @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow, + @Deprecated @JsonProperty("partitionColumnNames") List partitionColumnNames ) { this.query = Preconditions.checkNotNull(query, "query"); this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator"); this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); + + this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow; + + if (partitionColumnNames == null) { + throw DruidException.defensive("List of partition column names encountered as null."); + } + this.partitionColumnNames = partitionColumnNames; } @JsonProperty("query") @@ -90,6 +102,18 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor return stageRowSignature; } + @JsonProperty("partitionColumnNames") + public List getPartitionColumnNames() + { + return partitionColumnNames; + } + + @JsonProperty("maxRowsMaterializedInWindow") + public int getMaxRowsMaterializedInWindow() + { + return maxRowsMaterializedInWindow; + } + @Override public ProcessorsAndChannels makeProcessors( StageDefinition stageDefinition, @@ -165,14 +189,16 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor return false; } WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o; - return Objects.equals(query, that.query) + return maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow + && Objects.equals(query, that.query) && Objects.equals(operatorList, that.operatorList) - && Objects.equals(stageRowSignature, that.stageRowSignature); + && Objects.equals(stageRowSignature, that.stageRowSignature) + && Objects.equals(partitionColumnNames, that.partitionColumnNames); } @Override public int hashCode() { - return Objects.hash(query, operatorList, stageRowSignature); + return Objects.hash(query, operatorList, stageRowSignature, maxRowsMaterializedInWindow, partitionColumnNames); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index bc789528b06..b168860ff4c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -37,9 +37,7 @@ import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; import org.apache.druid.query.operator.AbstractSortOperatorFactory; import org.apache.druid.query.operator.ColumnWithDirection; -import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; import org.apache.druid.query.operator.OperatorFactory; -import org.apache.druid.query.operator.PartitionSortOperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.operator.window.WindowOperatorFactory; import org.apache.druid.segment.column.ColumnType; @@ -50,6 +48,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class WindowOperatorQueryKit implements QueryKit { @@ -166,6 +165,14 @@ public class WindowOperatorQueryKit implements QueryKit log.info("Using row signature [%s] for window stage.", stageRowSignature); + final List partitionColumnNames = operatorList.get(i) + .stream() + .filter(of -> of instanceof AbstractPartitioningOperatorFactory) + .map(of -> (AbstractPartitioningOperatorFactory) of) + .flatMap(of -> of.getPartitionColumns().stream()) + .collect(Collectors.toList()); + + queryDefBuilder.add( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) @@ -174,8 +181,10 @@ public class WindowOperatorQueryKit implements QueryKit .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, - getOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized), - stageRowSignature + operatorList.get(i), + stageRowSignature, + maxRowsMaterialized, + partitionColumnNames )) ); } @@ -316,37 +325,4 @@ public class WindowOperatorQueryKit implements QueryKit finalWindowClusterBy.getColumns() ); } - - /** - * This method converts the operator chain received from native plan into MSQ plan. - * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). - * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. - * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. - * @param operatorFactoryListFromQuery - * @param maxRowsMaterializedInWindow - * @return - */ - private List getOperatorFactoryListForStageDefinition(List operatorFactoryListFromQuery, int maxRowsMaterializedInWindow) - { - final List operatorFactoryList = new ArrayList<>(); - final List sortOperatorFactoryList = new ArrayList<>(); - for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { - if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { - AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; - operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow)); - } else if (operatorFactory instanceof AbstractSortOperatorFactory) { - AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; - sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); - } else { - // Add all the PartitionSortOperator(s) before every window operator. - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - operatorFactoryList.add(operatorFactory); - } - } - - operatorFactoryList.addAll(sortOperatorFactoryList); - sortOperatorFactoryList.clear(); - return operatorFactoryList; - } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java index 58affd228e1..811a17dea62 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java @@ -28,7 +28,13 @@ public class WindowOperatorQueryFrameProcessorFactoryTest public void testEqualsAndHashcode() { EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class) - .withNonnullFields("query", "operatorList", "stageRowSignature") + .withNonnullFields( + "query", + "operatorList", + "stageRowSignature", + "maxRowsMaterializedInWindow", + "partitionColumnNames" + ) .usingGetClass() .verify(); }