MSQ: Add QueryKitSpec to encapsulate QueryKit params. (#17077)

* MSQ: Add QueryKitSpec to encapsulate QueryKit params.

This patch introduces QueryKitSpec, an object that encapsulates the
parameters to makeQueryDefinition that are consistent from call to
call. This simplifies things because we avoid passing around all the
components individually.

This patch also splits "maxWorkerCount" into "maxLeafWorkerCount" and
"maxNonLeafWorkerCount", which apply to leaf stages (no other stages as
inputs) and nonleaf stages respectively.

Finally, this patch also rovides a way for ControllerContext to supply a
QueryKitSpec to its liking. It is expected that this will be used by
controllers of quick interactive queries to set maxNonLeafWorkerCount = 1,
which will generate fanning-in query plans.

* Fix javadoc.
This commit is contained in:
Gian Merlino 2024-09-17 13:37:14 -07:00 committed by GitHub
parent a93546d493
commit c5968aa463
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 241 additions and 194 deletions

View File

@ -30,7 +30,8 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.query.Query;
import org.apache.druid.server.DruidNode;
/**
@ -103,8 +104,13 @@ public interface ControllerContext
WorkerClient newWorkerClient();
/**
* Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using
* {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}.
* Create a {@link QueryKitSpec}. This method provides controller contexts a way to customize parameters around the
* number of workers and partitions.
*/
int defaultTargetPartitionsPerWorker();
QueryKitSpec makeQueryKitSpec(
QueryKit<Query<?>> queryKit,
String queryId,
MSQSpec querySpec,
ControllerQueryKernelConfig queryKernelConfig
);
}

View File

@ -152,6 +152,7 @@ import org.apache.druid.msq.kernel.controller.ControllerStagePhase;
import org.apache.druid.msq.kernel.controller.WorkerInputs;
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
@ -567,14 +568,9 @@ public class ControllerImpl implements Controller
final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
queryId(),
makeQueryControllerToolKit(),
context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig),
querySpec,
context.jsonMapper(),
MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
queryContext,
context.defaultTargetPartitionsPerWorker()
),
resultsContext
);
@ -1201,7 +1197,7 @@ public class ControllerImpl implements Controller
}
@SuppressWarnings("rawtypes")
private QueryKit makeQueryControllerToolKit()
private QueryKit<Query<?>> makeQueryControllerToolKit()
{
final Map<Class<? extends Query>, QueryKit> kitMap =
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
@ -1725,11 +1721,9 @@ public class ControllerImpl implements Controller
@SuppressWarnings("unchecked")
private static QueryDefinition makeQueryDefinition(
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
final QueryKitSpec queryKitSpec,
final MSQSpec querySpec,
final ObjectMapper jsonMapper,
final int targetPartitionsPerWorker,
final ResultsContext resultsContext
)
{
@ -1773,13 +1767,10 @@ public class ControllerImpl implements Controller
final QueryDefinition queryDef;
try {
queryDef = toolKit.makeQueryDefinition(
queryId,
queryDef = queryKitSpec.getQueryKit().makeQueryDefinition(
queryKitSpec,
queryToPlan,
toolKit,
resultShuffleSpecFactory,
tuningConfig.getMaxNumWorkers(),
targetPartitionsPerWorker,
0
);
}
@ -1808,7 +1799,7 @@ public class ControllerImpl implements Controller
// Add all query stages.
// Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage.
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
if (stageDef.equals(finalShuffleStageDef)) {
@ -1834,7 +1825,7 @@ public class ControllerImpl implements Controller
// attaching new query results stage if the final stage does sort during shuffle so that results are ordered.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
if (finalShuffleStageDef.doesSortDuringShuffle()) {
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
@ -1871,7 +1862,7 @@ public class ControllerImpl implements Controller
}
final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId());
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
@ -1879,7 +1870,7 @@ public class ControllerImpl implements Controller
.signature(queryDef.getFinalStageDefinition().getSignature())
.shuffleSpec(null)
.processorFactory(new ExportResultsFrameProcessorFactory(
queryId,
queryKitSpec.getQueryId(),
exportStorageProvider,
resultFormat,
columnMappings,

View File

@ -47,8 +47,11 @@ import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
@ -203,11 +206,26 @@ public class IndexerControllerContext implements ControllerContext
}
@Override
public int defaultTargetPartitionsPerWorker()
public QueryKitSpec makeQueryKitSpec(
final QueryKit<Query<?>> queryKit,
final String queryId,
final MSQSpec querySpec,
final ControllerQueryKernelConfig queryKernelConfig
)
{
// Assume tasks are symmetric: workers have the same number of processors available as a controller.
// Create one partition per processor per task, for maximum parallelism.
return memoryIntrospector.numProcessingThreads();
return new QueryKitSpec(
queryKit,
queryId,
querySpec.getTuningConfig().getMaxNumWorkers(),
querySpec.getTuningConfig().getMaxNumWorkers(),
// Assume tasks are symmetric: workers have the same number of processors available as a controller.
// Create one partition per processor per task, for maximum parallelism.
MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
querySpec.getQuery().context(),
memoryIntrospector.numProcessingThreads()
)
);
}
/**

View File

@ -123,8 +123,7 @@ public class DataSourcePlan
/**
* Build a plan.
*
* @param queryKit query kit reference for recursive planning
* @param queryId query ID
* @param queryKitSpec reference for recursive planning
* @param queryContext query context
* @param dataSource datasource to plan
* @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned
@ -132,22 +131,17 @@ public class DataSourcePlan
* @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this
* filter. Query processing must still apply the filter to generated correct results.
* @param filterFields which fields from the filter to consider for pruning, or null to consider all fields.
* @param maxWorkerCount maximum number of workers for subqueries
* @param minStageNumber starting stage number for subqueries
* @param broadcast whether the plan should broadcast data for this datasource
* @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries
*/
@SuppressWarnings("rawtypes")
public static DataSourcePlan forDataSource(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final DataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
@Nullable Set<String> filterFields,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
@ -182,51 +176,38 @@ public class DataSourcePlan
return forLookup((LookupDataSource) dataSource, broadcast);
} else if (dataSource instanceof FilteredDataSource) {
return forFilteredDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
(FilteredDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnnestDataSource) {
return forUnnest(
queryKit,
queryId,
queryKitSpec,
queryContext,
(UnnestDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
} else if (dataSource instanceof QueryDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forQuery(
queryKit,
queryId,
queryKitSpec,
(QueryDataSource) dataSource,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast,
queryContext
broadcast
);
} else if (dataSource instanceof UnionDataSource) {
return forUnion(
queryKit,
queryId,
queryKitSpec,
queryContext,
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
@ -240,27 +221,21 @@ public class DataSourcePlan
switch (deducedJoinAlgorithm) {
case BROADCAST:
return forBroadcastHashJoin(
queryKit,
queryId,
queryKitSpec,
queryContext,
(JoinDataSource) dataSource,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
case SORT_MERGE:
return forSortMergeJoin(
queryKit,
queryId,
queryKitSpec,
(JoinDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
@ -422,25 +397,18 @@ public class DataSourcePlan
}
private static DataSourcePlan forQuery(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryDataSource dataSource,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast,
@Nullable final QueryContext parentContext
final boolean broadcast
)
{
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,
final QueryDefinition subQueryDef = queryKitSpec.getQueryKit().makeQueryDefinition(
queryKitSpec,
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
queryKit,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker),
maxWorkerCount,
targetPartitionsPerWorker,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()),
minStageNumber
);
@ -455,27 +423,21 @@ public class DataSourcePlan
}
private static DataSourcePlan forFilteredDataSource(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final FilteredDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
{
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
null,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
@ -497,28 +459,22 @@ public class DataSourcePlan
* Build a plan for Unnest data source
*/
private static DataSourcePlan forUnnest(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final UnnestDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
{
// Find the plan for base data source by recursing
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
null,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
broadcast
);
@ -543,15 +499,12 @@ public class DataSourcePlan
}
private static DataSourcePlan forUnion(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
@Nullable Set<String> filterFields,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
@ -559,22 +512,19 @@ public class DataSourcePlan
// This is done to prevent loss of generality since MSQ can plan any type of DataSource.
List<DataSource> children = unionDataSource.getDataSources();
final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId());
final List<DataSource> newChildren = new ArrayList<>();
final List<InputSpec> inputSpecs = new ArrayList<>();
final IntSet broadcastInputs = new IntOpenHashSet();
for (DataSource child : children) {
DataSourcePlan childDataSourcePlan = forDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
child,
querySegmentSpec,
filter,
filterFields,
maxWorkerCount,
targetPartitionsPerWorker,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
);
@ -598,32 +548,26 @@ public class DataSourcePlan
* Build a plan for broadcast hash-join.
*/
private static DataSourcePlan forBroadcastHashJoin(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final QueryContext queryContext,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
{
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId());
final DataSourceAnalysis analysis = dataSource.getAnalysis();
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
analysis.getBaseDataSource(),
querySegmentSpec,
filter,
filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis),
maxWorkerCount,
targetPartitionsPerWorker,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
broadcast
);
@ -636,15 +580,12 @@ public class DataSourcePlan
for (int i = 0; i < analysis.getPreJoinableClauses().size(); i++) {
final PreJoinableClause clause = analysis.getPreJoinableClauses().get(i);
final DataSourcePlan clausePlan = forDataSource(
queryKit,
queryId,
queryKitSpec,
queryContext,
clause.getDataSource(),
new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY),
null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly.
null,
maxWorkerCount,
targetPartitionsPerWorker,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
true // Always broadcast right-hand side of the join.
);
@ -674,12 +615,9 @@ public class DataSourcePlan
* Build a plan for sort-merge join.
*/
private static DataSourcePlan forSortMergeJoin(
final QueryKit queryKit,
final String queryId,
final QueryKitSpec queryKitSpec,
final JoinDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber,
final boolean broadcast
)
@ -692,20 +630,16 @@ public class DataSourcePlan
SortMergeJoinFrameProcessorFactory.validateCondition(dataSource.getConditionAnalysis())
);
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId());
// Plan the left input.
// We're confident that we can cast dataSource.getLeft() to QueryDataSource, because DruidJoinQueryRel creates
// subqueries when the join algorithm is sortMerge.
final DataSourcePlan leftPlan = forQuery(
queryKit,
queryId,
queryKitSpec,
(QueryDataSource) dataSource.getLeft(),
maxWorkerCount,
targetPartitionsPerWorker,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false,
null
false
);
leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
@ -713,14 +647,10 @@ public class DataSourcePlan
// We're confident that we can cast dataSource.getRight() to QueryDataSource, because DruidJoinQueryRel creates
// subqueries when the join algorithm is sortMerge.
final DataSourcePlan rightPlan = forQuery(
queryKit,
queryId,
queryKitSpec,
(QueryDataSource) dataSource.getRight(),
maxWorkerCount,
targetPartitionsPerWorker,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false,
null
false
);
rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
@ -729,7 +659,7 @@ public class DataSourcePlan
((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber()
);
final int hashPartitionCount = maxWorkerCount * targetPartitionsPerWorker;
final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle();
final List<KeyColumn> leftPartitionKey = partitionKeys.get(0);
leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount));
leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey));
@ -768,7 +698,7 @@ public class DataSourcePlan
Iterables.getOnlyElement(rightPlan.getInputSpecs())
)
)
.maxWorkerCount(maxWorkerCount)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.signature(joinSignatureBuilder.build())
.processorFactory(
new SortMergeJoinFrameProcessorFactory(

View File

@ -41,12 +41,9 @@ public class MultiQueryKit implements QueryKit<Query<?>>
@Override
public QueryDefinition makeQueryDefinition(
String queryId,
QueryKitSpec queryKitSpec,
Query<?> query,
QueryKit<Query<?>> toolKitForSubQueries,
ShuffleSpecFactory resultShuffleSpecFactory,
int maxWorkerCount,
int targetPartitionsPerWorker,
int minStageNumber
)
{
@ -55,12 +52,9 @@ public class MultiQueryKit implements QueryKit<Query<?>>
if (specificToolKit != null) {
//noinspection unchecked
return specificToolKit.makeQueryDefinition(
queryId,
queryKitSpec,
query,
this,
resultShuffleSpecFactory,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber
);
} else {

View File

@ -30,25 +30,17 @@ public interface QueryKit<QueryType extends Query<?>>
/**
* Creates a {@link QueryDefinition} from a {@link Query}.
*
* @param queryId query ID of the resulting {@link QueryDefinition}
* @param queryKitSpec collection of parameters necessary for planning {@link QueryDefinition}
* @param query native query to translate
* @param toolKitForSubQueries kit that is used to translate native subqueries; i.e.,
* {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
* @param resultShuffleSpecFactory shuffle spec factory for the final output of this query.
* @param maxWorkerCount maximum number of workers: becomes
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}
* @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting
* {@link QueryDefinition} is going to be added to an existing
* {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}.
* @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries
*/
QueryDefinition makeQueryDefinition(
String queryId,
QueryKitSpec queryKitSpec,
QueryType query,
QueryKit<Query<?>> toolKitForSubQueries,
ShuffleSpecFactory resultShuffleSpecFactory,
int maxWorkerCount,
int targetPartitionsPerWorker,
int minStageNumber
);
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.query.Query;
import java.util.List;
/**
* Collection of parameters for {@link QueryKit#makeQueryDefinition}.
*/
public class QueryKitSpec
{
private final QueryKit<Query<?>> queryKit;
private final String queryId;
private final int maxLeafWorkerCount;
private final int maxNonLeafWorkerCount;
private final int targetPartitionsPerWorker;
/**
* @param queryKit kit that is used to translate native subqueries; i.e.,
* {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
* @param queryId queryId of the resulting {@link QueryDefinition}
* @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}
* @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}
* @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries
*/
public QueryKitSpec(
QueryKit<Query<?>> queryKit,
String queryId,
int maxLeafWorkerCount,
int maxNonLeafWorkerCount,
int targetPartitionsPerWorker
)
{
this.queryId = queryId;
this.queryKit = queryKit;
this.maxLeafWorkerCount = maxLeafWorkerCount;
this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
this.targetPartitionsPerWorker = targetPartitionsPerWorker;
}
/**
* Instance of {@link QueryKit} for recursive calls.
*/
public QueryKit<Query<?>> getQueryKit()
{
return queryKit;
}
/**
* Query ID to use when building {@link QueryDefinition}.
*/
public String getQueryId()
{
return queryId;
}
/**
* Maximum worker count for a stage with the given inputs. Will use {@link #maxNonLeafWorkerCount} if there are
* any stage inputs, {@link #maxLeafWorkerCount} otherwise.
*/
public int getMaxWorkerCount(final List<InputSpec> inputSpecs)
{
if (InputSpecs.getStageNumbers(inputSpecs).isEmpty()) {
return maxLeafWorkerCount;
} else {
return maxNonLeafWorkerCount;
}
}
/**
* Maximum number of workers for non-leaf stages (where there are some stage inputs).
*/
public int getMaxNonLeafWorkerCount()
{
return maxNonLeafWorkerCount;
}
/**
* Number of partitions to generate during a shuffle.
*/
public int getNumPartitionsForShuffle()
{
return maxNonLeafWorkerCount * targetPartitionsPerWorker;
}
}

View File

@ -35,7 +35,6 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.NaiveSortOperatorFactory;
@ -63,12 +62,9 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
@Override
public QueryDefinition makeQueryDefinition(
String queryId,
QueryKitSpec queryKitSpec,
WindowOperatorQuery originalQuery,
QueryKit<Query<?>> queryKit,
ShuffleSpecFactory resultShuffleSpecFactory,
int maxWorkerCount,
int targetPartitionsPerWorker,
int minStageNumber
)
{
@ -90,22 +86,22 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
log.info("Created operatorList with operator factories: [%s]", operatorList);
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
queryKitSpec,
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
false
);
ShuffleSpec nextShuffleSpec =
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount * targetPartitionsPerWorker);
final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(
operatorList.get(0),
queryKitSpec.getNumPartitionsForShuffle()
);
final QueryDefinitionBuilder queryDefBuilder =
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec);
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
@ -133,7 +129,7 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
StageDefinition.builder(firstStageNumber)
.inputs(new StageInputSpec(firstStageNumber - 1))
.signature(finalWindowStageRowSignature)
.maxWorkerCount(maxWorkerCount)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(finalWindowStageShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
@ -196,7 +192,7 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
nextShuffleSpec =
findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount * targetPartitionsPerWorker);
findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle());
if (nextShuffleSpec == null) {
stageRowSignature = intermediateSignature;
} else {
@ -233,7 +229,7 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
.signature(stageRowSignature)
.maxWorkerCount(maxWorkerCount)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,

View File

@ -34,12 +34,12 @@ import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.DataSourcePlan;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.query.DimensionComparisonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.AlwaysHavingSpec;
@ -66,28 +66,22 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
@Override
public QueryDefinition makeQueryDefinition(
final String queryId,
final QueryKitSpec queryKitSpec,
final GroupByQuery originalQuery,
final QueryKit<Query<?>> queryKit,
final ShuffleSpecFactory resultShuffleSpecFactory,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber
)
{
validateQuery(originalQuery);
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId());
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
queryKitSpec,
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
false
);
@ -144,7 +138,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
shuffleSpecFactoryPreAggregation =
intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition()
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker);
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle());
if (doLimitOrOffset) {
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
@ -169,7 +163,10 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
.signature(intermediateSignature)
.shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true))
.maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount)
.maxWorkerCount(
dataSourcePlan.isSingleWorker()
? 1
: queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs()))
.processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun))
);
@ -189,7 +186,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(resultSignature)
.maxWorkerCount(maxWorkerCount)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
@ -390,7 +387,10 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
for (final OrderByColumnSpec column : defaultLimitSpec.getColumns()) {
final Optional<ColumnType> type = resultSignature.getColumnType(column.getDimension());
if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(type.get().getType(), column.getDimensionComparator())) {
if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(
type.get().getType(),
column.getDimensionComparator()
)) {
throw new ISE(
"Must use natural comparator for column [%s] of type [%s]",
column.getDimension(),

View File

@ -33,13 +33,13 @@ import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.DataSourcePlan;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.Query;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -86,26 +86,20 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
// partition without a ClusterBy, we don't need to necessarily create it via the resultShuffleSpecFactory provided
@Override
public QueryDefinition makeQueryDefinition(
final String queryId,
final QueryKitSpec queryKitSpec,
final ScanQuery originalQuery,
final QueryKit<Query<?>> queryKit,
final ShuffleSpecFactory resultShuffleSpecFactory,
final int maxWorkerCount,
final int targetPartitionsPerWorker,
final int minStageNumber
)
{
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId());
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
queryKitSpec,
originalQuery.context(),
originalQuery.getDataSource(),
originalQuery.getQuerySegmentSpec(),
originalQuery.getFilter(),
null,
maxWorkerCount,
targetPartitionsPerWorker,
minStageNumber,
false
);
@ -179,7 +173,10 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
.shuffleSpec(scanShuffleSpec)
.signature(signatureToUse)
.maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount)
.maxWorkerCount(
dataSourcePlan.isSingleWorker()
? 1
: queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs()))
.processorFactory(new ScanQueryFrameProcessorFactory(queryToRun))
);

View File

@ -60,7 +60,10 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
@ -273,6 +276,23 @@ public class MSQTestControllerContext implements ControllerContext
return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000));
}
@Override
public QueryKitSpec makeQueryKitSpec(
final QueryKit<Query<?>> queryKit,
final String queryId,
final MSQSpec querySpec,
final ControllerQueryKernelConfig queryKernelConfig
)
{
return new QueryKitSpec(
queryKit,
queryId,
querySpec.getTuningConfig().getMaxNumWorkers(),
querySpec.getTuningConfig().getMaxNumWorkers(),
1
);
}
@Override
public void emitMetric(String metric, Number value)
{
@ -341,10 +361,4 @@ public class MSQTestControllerContext implements ControllerContext
{
return new MSQTestWorkerClient(inMemoryWorkers);
}
@Override
public int defaultTargetPartitionsPerWorker()
{
return 1;
}
}