MSQ WF: Pass a flag from broker to determine operator chain transformation (#17443)

This commit is contained in:
Akshat Jain 2024-11-12 09:28:28 +05:30 committed by GitHub
parent ae049a4bab
commit 3f56b57c7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 74 additions and 44 deletions

View File

@ -571,7 +571,7 @@ public class ControllerImpl implements Controller
final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig),
context.makeQueryKitSpec(makeQueryControllerToolKit(queryContext), queryId, querySpec, queryKernelConfig),
querySpec,
context,
resultsContext
@ -1211,13 +1211,19 @@ public class ControllerImpl implements Controller
}
@SuppressWarnings("rawtypes")
private QueryKit<Query<?>> makeQueryControllerToolKit()
private QueryKit<Query<?>> makeQueryControllerToolKit(QueryContext queryContext)
{
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.put(
WindowOperatorQuery.class,
new WindowOperatorQueryKit(
context.jsonMapper(),
MultiStageQueryContext.isWindowFunctionOperatorTransformationEnabled(queryContext)
)
)
.build();
return new MultiQueryKit(kitMap);

View File

@ -40,13 +40,9 @@ import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
@ -101,7 +97,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.frameWriterFactory = frameWriterFactory;
this.resultRowAndCols = new ArrayList<>();
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(queryContext);
this.operatorFactoryList = getOperatorFactoryListForStageDefinition(operatorFactoryList);
this.operatorFactoryList = operatorFactoryList;
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);
this.frameReader = frameReader;
@ -403,36 +399,4 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
resultRowAndCols.clear();
rowId.set(0);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), this.maxRowsMaterialized));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}

View File

@ -37,7 +37,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
@ -54,10 +56,12 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
private final ObjectMapper jsonMapper;
private final boolean isOperatorTransformationEnabled;
public WindowOperatorQueryKit(ObjectMapper jsonMapper)
public WindowOperatorQueryKit(ObjectMapper jsonMapper, boolean isOperatorTransformationEnabled)
{
this.jsonMapper = jsonMapper;
this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
}
@Override
@ -172,6 +176,9 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
.flatMap(of -> of.getPartitionColumns().stream())
.collect(Collectors.toList());
final List<OperatorFactory> operatorFactories = isOperatorTransformationEnabled
? getTransformedOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized)
: operatorList.get(i);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
@ -181,7 +188,7 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorList.get(i),
operatorFactories,
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
@ -325,4 +332,40 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
finalWindowClusterBy.getColumns()
);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getTransformedOperatorFactoryListForStageDefinition(
List<OperatorFactory> operatorFactoryListFromQuery,
int maxRowsMaterializedInWindow
)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}

View File

@ -289,6 +289,9 @@ public class MSQTaskQueryMaker implements QueryMaker
// Add appropriate finalization to native query context.
nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, finalizeAggregations);
// This flag is to ensure backward compatibility, as brokers are upgraded after indexers/middlemanagers.
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true);
final MSQSpec querySpec =
MSQSpec.builder()
.query(druidQuery.getQuery().withOverriddenContext(nativeQueryContextOverrides))

View File

@ -191,6 +191,9 @@ public class MultiStageQueryContext
public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";
// This flag ensures backward compatibility and will be removed in Druid 33, with the default behavior as enabled.
public static final String WINDOW_FUNCTION_OPERATOR_TRANSFORMATION = "windowFunctionOperatorTransformation";
public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";
/**
@ -217,6 +220,14 @@ public class MultiStageQueryContext
);
}
public static boolean isWindowFunctionOperatorTransformationEnabled(final QueryContext queryContext)
{
return queryContext.getBoolean(
WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
false
);
}
public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages

View File

@ -273,6 +273,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll")
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
.build();
public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =

View File

@ -95,7 +95,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},
@ -201,7 +202,8 @@ order by 1;
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
"sqlStringifyArrays" : false,
"windowFunctionOperatorTransformation" : true
}
}
},