Fix backward compatibility issues in WindowOperatorQueryFrameProcessorFactory and WindowOperatorQueryFrameProcessor (#17433)

This commit is contained in:
Akshat Jain 2024-10-30 11:32:02 +05:30 committed by GitHub
parent 66eb365e4d
commit 63c91ad813
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 43 deletions

View File

@ -39,9 +39,13 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
@ -95,9 +99,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.inputChannel = inputChannel;
this.outputChannel = outputChannel;
this.frameWriterFactory = frameWriterFactory;
this.operatorFactoryList = operatorFactoryList;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);
this.frameReader = frameReader;
@ -399,4 +403,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
resultRowAndCols.clear();
rowId.set(0);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
@ -59,17 +60,28 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorList;
private final RowSignature stageRowSignature;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;
@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@Deprecated @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@Deprecated @JsonProperty("partitionColumnNames") List<String> partitionColumnNames
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
if (partitionColumnNames == null) {
throw DruidException.defensive("List of partition column names encountered as null.");
}
this.partitionColumnNames = partitionColumnNames;
}
@JsonProperty("query")
@ -90,6 +102,18 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return stageRowSignature;
}
@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}
@JsonProperty("maxRowsMaterializedInWindow")
public int getMaxRowsMaterializedInWindow()
{
return maxRowsMaterializedInWindow;
}
@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
@ -165,14 +189,16 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return false;
}
WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o;
return Objects.equals(query, that.query)
return maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
&& Objects.equals(stageRowSignature, that.stageRowSignature)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames);
}
@Override
public int hashCode()
{
return Objects.hash(query, operatorList, stageRowSignature);
return Objects.hash(query, operatorList, stageRowSignature, maxRowsMaterializedInWindow, partitionColumnNames);
}
}

View File

@ -37,9 +37,7 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
@ -50,6 +48,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
@ -166,6 +165,14 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
log.info("Using row signature [%s] for window stage.", stageRowSignature);
final List<String> partitionColumnNames = operatorList.get(i)
.stream()
.filter(of -> of instanceof AbstractPartitioningOperatorFactory)
.map(of -> (AbstractPartitioningOperatorFactory) of)
.flatMap(of -> of.getPartitionColumns().stream())
.collect(Collectors.toList());
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
@ -174,8 +181,10 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
getOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized),
stageRowSignature
operatorList.get(i),
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
))
);
}
@ -316,37 +325,4 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
finalWindowClusterBy.getColumns()
);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery, int maxRowsMaterializedInWindow)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}

View File

@ -28,7 +28,13 @@ public class WindowOperatorQueryFrameProcessorFactoryTest
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
.withNonnullFields("query", "operatorList", "stageRowSignature")
.withNonnullFields(
"query",
"operatorList",
"stageRowSignature",
"maxRowsMaterializedInWindow",
"partitionColumnNames"
)
.usingGetClass()
.verify();
}