MSQ: Include worker context maps in WorkOrders. (#17076)

* MSQ: Include worker context maps in WorkOrders.

This provides a mechanism to send contexts to workers in long-lived,
shared JVMs that are not part of the task system.

* Style, coverage.
This commit is contained in:
Gian Merlino 2024-09-17 01:37:21 -07:00 committed by GitHub
parent bb1c3c1749
commit 2e4d596d82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 408 additions and 95 deletions

View File

@ -28,7 +28,6 @@ import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
@ -43,7 +42,7 @@ public interface ControllerContext
/** /**
* Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}. * Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
*/ */
ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef); ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec);
/** /**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation * Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
@ -88,7 +87,7 @@ public interface ControllerContext
* *
* @param queryId query ID * @param queryId query ID
* @param querySpec query spec * @param querySpec query spec
* @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)} * @param queryKernelConfig config from {@link #queryKernelConfig(String, MSQSpec)}
* @param workerFailureListener listener that receives callbacks when workers fail * @param workerFailureListener listener that receives callbacks when workers fail
*/ */
WorkerManager newWorkerManager( WorkerManager newWorkerManager(

View File

@ -562,8 +562,8 @@ public class ControllerImpl implements Controller
private QueryDefinition initializeQueryDefAndState(final Closer closer) private QueryDefinition initializeQueryDefAndState(final Closer closer)
{ {
this.selfDruidNode = context.selfNode(); this.selfDruidNode = context.selfNode();
this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient()); this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient()));
closer.register(netClient); this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec);
final QueryContext queryContext = querySpec.getQuery().context(); final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition( final QueryDefinition queryDef = makeQueryDefinition(
@ -594,7 +594,6 @@ public class ControllerImpl implements Controller
QueryValidator.validateQueryDef(queryDef); QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef); queryDefRef.set(queryDef);
queryKernelConfig = context.queryKernelConfig(querySpec, queryDef);
workerManager = context.newWorkerManager( workerManager = context.newWorkerManager(
queryId, queryId,
querySpec, querySpec,

View File

@ -48,7 +48,6 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.MSQErrorReport;
@ -388,7 +387,6 @@ public class WorkerImpl implements Worker
final InputChannelFactory inputChannelFactory = final InputChannelFactory inputChannelFactory =
makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser); makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser);
final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty();
final boolean includeAllCounters = context.includeAllCounters(); final boolean includeAllCounters = context.includeAllCounters();
final RunWorkOrder runWorkOrder = new RunWorkOrder( final RunWorkOrder runWorkOrder = new RunWorkOrder(
workOrder, workOrder,
@ -402,8 +400,8 @@ public class WorkerImpl implements Worker
context, context,
frameContext, frameContext,
makeRunWorkOrderListener(workOrder, controllerClient, criticalWarningCodes, maxVerboseParseExceptions), makeRunWorkOrderListener(workOrder, controllerClient, criticalWarningCodes, maxVerboseParseExceptions),
MultiStageQueryContext.isReindex(queryContext), MultiStageQueryContext.isReindex(workOrder.getWorkerContext()),
MultiStageQueryContext.removeNullBytes(queryContext) MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext())
); );
// Set up processorCloser (called when processing is done). // Set up processorCloser (called when processing is done).
@ -560,6 +558,13 @@ public class WorkerImpl implements Worker
return getOrCreateStageOutputHolder(stageId, partitionNumber).readRemotelyFrom(offset); return getOrCreateStageOutputHolder(stageId, partitionNumber).readRemotelyFrom(offset);
} }
/**
* Accept a new {@link WorkOrder} for execution.
*
* For backwards-compatibility purposes, this method populates {@link WorkOrder#getOutputChannelMode()}
* and {@link WorkOrder#getWorkerContext()} if the controller did not set them. (They are there for newer controllers,
* but not older ones.)
*/
@Override @Override
public void postWorkOrder(final WorkOrder workOrder) public void postWorkOrder(final WorkOrder workOrder)
{ {
@ -577,28 +582,11 @@ public class WorkerImpl implements Worker
); );
} }
final OutputChannelMode outputChannelMode; final WorkOrder workOrderToUse = makeWorkOrderToUse(
workOrder,
task != null && task.getContext() != null ? QueryContext.of(task.getContext()) : QueryContext.empty()
);
// This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
if (workOrder.hasOutputChannelMode()) {
outputChannelMode = workOrder.getOutputChannelMode();
} else {
final MSQSelectDestination selectDestination =
task != null
? MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext()))
: MSQSelectDestination.TASKREPORT;
outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
workOrder.getQueryDefinition(),
workOrder.getStageNumber(),
selectDestination,
task != null && MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())),
false
);
}
final WorkOrder workOrderToUse = workOrder.withOutputChannelMode(outputChannelMode);
kernelManipulationQueue.add( kernelManipulationQueue.add(
kernelHolders -> kernelHolders ->
kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse)) kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse))
@ -1009,6 +997,48 @@ public class WorkerImpl implements Worker
); );
} }
/**
* Returns a work order based on the provided "originalWorkOrder", but where {@link WorkOrder#hasOutputChannelMode()}
* and {@link WorkOrder#hasWorkerContext()} are both true. If the original work order didn't have those fields, they
* are populated from the "taskContext". Otherwise the "taskContext" is ignored.
*
* This method can be removed once we can rely on these fields always being set in the WorkOrder.
* (They will be there for newer controllers; this is a backwards-compatibility method.)
*
* @param originalWorkOrder work order from controller
* @param taskContext task context
*/
static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder, @Nullable final QueryContext taskContext)
{
// This condition can be removed once we can rely on QueryContext always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
final QueryContext queryContext;
if (originalWorkOrder.hasWorkerContext()) {
queryContext = originalWorkOrder.getWorkerContext();
} else if (taskContext != null) {
queryContext = taskContext;
} else {
queryContext = QueryContext.empty();
}
// This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
final OutputChannelMode outputChannelMode;
if (originalWorkOrder.hasOutputChannelMode()) {
outputChannelMode = originalWorkOrder.getOutputChannelMode();
} else {
outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
originalWorkOrder.getQueryDefinition(),
originalWorkOrder.getStageNumber(),
MultiStageQueryContext.getSelectDestination(queryContext),
MultiStageQueryContext.isDurableStorageEnabled(queryContext),
false
);
}
return originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode);
}
/** /**
* Log (at DEBUG level) a string explaining the status of all work assigned to this worker. * Log (at DEBUG level) a string explaining the status of all work assigned to this worker.
*/ */

View File

@ -46,7 +46,7 @@ import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.DruidMetrics;
@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class IndexerControllerContext implements ControllerContext public class IndexerControllerContext implements ControllerContext
{ {
public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
private static final Logger log = new Logger(IndexerControllerContext.class); private static final Logger log = new Logger(IndexerControllerContext.class);
private final MSQControllerTask task; private final MSQControllerTask task;
@ -96,21 +98,21 @@ public class IndexerControllerContext implements ControllerContext
@Override @Override
public ControllerQueryKernelConfig queryKernelConfig( public ControllerQueryKernelConfig queryKernelConfig(
final MSQSpec querySpec, final String queryId,
final QueryDefinition queryDef final MSQSpec querySpec
) )
{ {
final ControllerMemoryParameters memoryParameters = final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance( ControllerMemoryParameters.createProductionInstance(
memoryIntrospector, memoryIntrospector,
queryDef.getFinalStageDefinition().getMaxWorkerCount() querySpec.getTuningConfig().getMaxNumWorkers()
); );
final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters); final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters);
log.debug( log.debug(
"Query[%s] using %s[%s], %s[%s], %s[%s].", "Query[%s] using %s[%s], %s[%s], %s[%s].",
queryDef.getQueryId(), queryId,
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
config.isDurableStorage(), config.isDurableStorage(),
MultiStageQueryContext.CTX_FAULT_TOLERANCE, MultiStageQueryContext.CTX_FAULT_TOLERANCE,
@ -210,7 +212,7 @@ public class IndexerControllerContext implements ControllerContext
} }
/** /**
* Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests. * Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used in tests.
*/ */
public static ControllerQueryKernelConfig makeQueryKernelConfig( public static ControllerQueryKernelConfig makeQueryKernelConfig(
final MSQSpec querySpec, final MSQSpec querySpec,
@ -218,7 +220,8 @@ public class IndexerControllerContext implements ControllerContext
) )
{ {
final QueryContext queryContext = querySpec.getQuery().context(); final QueryContext queryContext = querySpec.getQuery().context();
final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); final int maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext, DEFAULT_MAX_CONCURRENT_STAGES);
final boolean isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext); final boolean isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
final boolean isDurableStorageEnabled; final boolean isDurableStorageEnabled;
@ -256,9 +259,44 @@ public class IndexerControllerContext implements ControllerContext
.destination(querySpec.getDestination()) .destination(querySpec.getDestination())
.maxConcurrentStages(maxConcurrentStages) .maxConcurrentStages(maxConcurrentStages)
.maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes()) .maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes())
.workerContextMap(makeWorkerContextMap(querySpec, isDurableStorageEnabled, maxConcurrentStages))
.build(); .build();
} }
/**
* Helper method for {@link #makeQueryKernelConfig} and {@link #makeTaskContext}. Makes the worker context map,
* i.e., the map that will become {@link WorkOrder#getWorkerContext()}.
*/
public static Map<String, Object> makeWorkerContextMap(
final MSQSpec querySpec,
final boolean durableStorageEnabled,
final int maxConcurrentStages
)
{
final QueryContext queryContext = querySpec.getQuery().context();
final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext);
final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(queryContext);
final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext);
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, durableStorageEnabled)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, maxConcurrentStages)
.put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
.put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters);
if (querySpec.getDestination().toSelectDestination() != null) {
builder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
querySpec.getDestination().toSelectDestination().getName()
);
}
return builder.build();
}
/** /**
* Helper method for {@link #newWorkerManager}, split out to be used in tests. * Helper method for {@link #newWorkerManager}, split out to be used in tests.
* *
@ -271,17 +309,16 @@ public class IndexerControllerContext implements ControllerContext
) )
{ {
final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());
final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context());
taskContextOverridesBuilder // Put worker context into the task context. That way, workers can get these context keys either from
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage()) // WorkOrder#getContext or Task#getContext.
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) taskContextOverridesBuilder.putAll(
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) makeWorkerContextMap(
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()) querySpec,
.put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) queryKernelConfig.isDurableStorage(),
.put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); queryKernelConfig.getMaxConcurrentStages()
)
);
// Put the lookup loading info in the task context to facilitate selective loading of lookups. // Put the lookup loading info in the task context to facilitate selective loading of lookups.
if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) {
@ -297,13 +334,6 @@ public class IndexerControllerContext implements ControllerContext
); );
} }
if (querySpec.getDestination().toSelectDestination() != null) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
querySpec.getDestination().toSelectDestination().getName()
);
}
// propagate the controller's tags to the worker task for enhanced metrics reporting // propagate the controller's tags to the worker task for enhanced metrics reporting
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> tags = (Map<String, Object>) controllerTaskContext.get(DruidMetrics.TAGS); Map<String, Object> tags = (Map<String, Object>) controllerTaskContext.get(DruidMetrics.TAGS);

View File

@ -116,7 +116,10 @@ public class IndexerWorkerContext implements WorkerContext
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
final QueryContext queryContext = QueryContext.of(task.getContext()); final QueryContext queryContext = QueryContext.of(task.getContext());
this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
queryContext,
IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
);
this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext);
} }

View File

@ -26,9 +26,11 @@ import com.google.common.base.Preconditions;
import org.apache.druid.msq.exec.ControllerClient; import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.OutputChannelMode; import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.query.QueryContext;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -51,9 +53,18 @@ public class WorkOrder
@Nullable @Nullable
private final List<String> workerIds; private final List<String> workerIds;
/**
* Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons.
*/
@Nullable @Nullable
private final OutputChannelMode outputChannelMode; private final OutputChannelMode outputChannelMode;
/**
* Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons.
*/
@Nullable
private final QueryContext workerContext;
@JsonCreator @JsonCreator
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public WorkOrder( public WorkOrder(
@ -63,7 +74,8 @@ public class WorkOrder
@JsonProperty("input") final List<InputSlice> workerInputs, @JsonProperty("input") final List<InputSlice> workerInputs,
@JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder, @JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder,
@JsonProperty("workers") @Nullable final List<String> workerIds, @JsonProperty("workers") @Nullable final List<String> workerIds,
@JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode @JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode,
@JsonProperty("context") @Nullable final Map<String, Object> workerContext
) )
{ {
this.queryDefinition = Preconditions.checkNotNull(queryDefinition, "queryDefinition"); this.queryDefinition = Preconditions.checkNotNull(queryDefinition, "queryDefinition");
@ -73,6 +85,7 @@ public class WorkOrder
this.extraInfoHolder = extraInfoHolder; this.extraInfoHolder = extraInfoHolder;
this.workerIds = workerIds; this.workerIds = workerIds;
this.outputChannelMode = outputChannelMode; this.outputChannelMode = outputChannelMode;
this.workerContext = workerContext != null ? QueryContext.of(workerContext) : null;
} }
@JsonProperty("query") @JsonProperty("query")
@ -124,6 +137,10 @@ public class WorkOrder
return outputChannelMode != null; return outputChannelMode != null;
} }
/**
* Retrieves the output channel mode set by the controller. Null means the controller didn't set it, which means
* we're dealing with an older controller.
*/
@Nullable @Nullable
@JsonProperty("output") @JsonProperty("output")
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
@ -132,6 +149,29 @@ public class WorkOrder
return outputChannelMode; return outputChannelMode;
} }
public boolean hasWorkerContext()
{
return workerContext != null;
}
/**
* Retrieves the query context set by the controller. Null means the controller didn't set it, which means
* we're dealing with an older controller.
*/
@Nullable
public QueryContext getWorkerContext()
{
return workerContext;
}
@Nullable
@JsonProperty("context")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, Object> getContextForSerialization()
{
return workerContext != null ? workerContext.asMap() : null;
}
@Nullable @Nullable
public Object getExtraInfo() public Object getExtraInfo()
{ {
@ -155,7 +195,26 @@ public class WorkOrder
workerInputs, workerInputs,
extraInfoHolder, extraInfoHolder,
workerIds, workerIds,
newOutputChannelMode newOutputChannelMode,
workerContext != null ? workerContext.asMap() : null
);
}
}
public WorkOrder withWorkerContext(final QueryContext newContext)
{
if (Objects.equals(newContext, this.workerContext)) {
return this;
} else {
return new WorkOrder(
queryDefinition,
stageNumber,
workerNumber,
workerInputs,
extraInfoHolder,
workerIds,
outputChannelMode,
newContext.asMap()
); );
} }
} }
@ -176,7 +235,8 @@ public class WorkOrder
&& Objects.equals(workerInputs, workOrder.workerInputs) && Objects.equals(workerInputs, workOrder.workerInputs)
&& Objects.equals(extraInfoHolder, workOrder.extraInfoHolder) && Objects.equals(extraInfoHolder, workOrder.extraInfoHolder)
&& Objects.equals(workerIds, workOrder.workerIds) && Objects.equals(workerIds, workOrder.workerIds)
&& Objects.equals(outputChannelMode, workOrder.outputChannelMode); && Objects.equals(outputChannelMode, workOrder.outputChannelMode)
&& Objects.equals(workerContext, workOrder.workerContext);
} }
@Override @Override
@ -189,7 +249,8 @@ public class WorkOrder
workerInputs, workerInputs,
extraInfoHolder, extraInfoHolder,
workerIds, workerIds,
outputChannelMode outputChannelMode,
workerContext
); );
} }
@ -204,6 +265,7 @@ public class WorkOrder
", extraInfoHolder=" + extraInfoHolder + ", extraInfoHolder=" + extraInfoHolder +
", workerIds=" + workerIds + ", workerIds=" + workerIds +
", outputChannelMode=" + outputChannelMode + ", outputChannelMode=" + outputChannelMode +
", context=" + workerContext +
'}'; '}';
} }
} }

View File

@ -302,7 +302,8 @@ public class ControllerQueryKernel
workerInputs.inputsForWorker(workerNumber), workerInputs.inputsForWorker(workerNumber),
extraInfoHolder, extraInfoHolder,
config.getWorkerIds(), config.getWorkerIds(),
outputChannelMode outputChannelMode,
config.getWorkerContextMap()
); );
QueryValidator.validateWorkOrder(workOrder); QueryValidator.validateWorkOrder(workOrder);

View File

@ -21,9 +21,12 @@ package org.apache.druid.msq.kernel.controller;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.msq.indexing.destination.MSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.kernel.WorkOrder;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -37,22 +40,22 @@ public class ControllerQueryKernelConfig
private final boolean durableStorage; private final boolean durableStorage;
private final boolean faultTolerance; private final boolean faultTolerance;
private final MSQDestination destination; private final MSQDestination destination;
@Nullable @Nullable
private final String controllerId; private final String controllerHost;
@Nullable @Nullable
private final List<String> workerIds; private final List<String> workerIds;
private final Map<String, Object> workerContextMap;
private ControllerQueryKernelConfig( ControllerQueryKernelConfig(
int maxRetainedPartitionSketchBytes, int maxRetainedPartitionSketchBytes,
int maxConcurrentStages, int maxConcurrentStages,
boolean pipeline, boolean pipeline,
boolean durableStorage, boolean durableStorage,
boolean faultTolerance, boolean faultTolerance,
MSQDestination destination, MSQDestination destination,
@Nullable String controllerId, @Nullable String controllerHost,
@Nullable List<String> workerIds @Nullable List<String> workerIds,
Map<String, Object> workerContextMap
) )
{ {
if (maxRetainedPartitionSketchBytes <= 0) { if (maxRetainedPartitionSketchBytes <= 0) {
@ -85,8 +88,9 @@ public class ControllerQueryKernelConfig
this.durableStorage = durableStorage; this.durableStorage = durableStorage;
this.faultTolerance = faultTolerance; this.faultTolerance = faultTolerance;
this.destination = destination; this.destination = destination;
this.controllerId = controllerId; this.controllerHost = controllerHost;
this.workerIds = workerIds; this.workerIds = workerIds;
this.workerContextMap = workerContextMap;
} }
public static Builder builder() public static Builder builder()
@ -130,6 +134,14 @@ public class ControllerQueryKernelConfig
return workerIds; return workerIds;
} }
/**
* Map to include in {@link WorkOrder}, as {@link WorkOrder#getWorkerContext()}.
*/
public Map<String, Object> getWorkerContextMap()
{
return workerContextMap;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -145,8 +157,10 @@ public class ControllerQueryKernelConfig
&& pipeline == that.pipeline && pipeline == that.pipeline
&& durableStorage == that.durableStorage && durableStorage == that.durableStorage
&& faultTolerance == that.faultTolerance && faultTolerance == that.faultTolerance
&& Objects.equals(controllerId, that.controllerId) && Objects.equals(destination, that.destination)
&& Objects.equals(workerIds, that.workerIds); && Objects.equals(controllerHost, that.controllerHost)
&& Objects.equals(workerIds, that.workerIds)
&& Objects.equals(workerContextMap, that.workerContextMap);
} }
@Override @Override
@ -158,8 +172,10 @@ public class ControllerQueryKernelConfig
pipeline, pipeline,
durableStorage, durableStorage,
faultTolerance, faultTolerance,
controllerId, destination,
workerIds controllerHost,
workerIds,
workerContextMap
); );
} }
@ -171,9 +187,11 @@ public class ControllerQueryKernelConfig
", maxConcurrentStages=" + maxConcurrentStages + ", maxConcurrentStages=" + maxConcurrentStages +
", pipeline=" + pipeline + ", pipeline=" + pipeline +
", durableStorage=" + durableStorage + ", durableStorage=" + durableStorage +
", faultTolerant=" + faultTolerance + ", faultTolerance=" + faultTolerance +
", controllerId='" + controllerId + '\'' + ", destination=" + destination +
", controllerHost='" + controllerHost + '\'' +
", workerIds=" + workerIds + ", workerIds=" + workerIds +
", workerContextMap=" + workerContextMap +
'}'; '}';
} }
@ -185,8 +203,9 @@ public class ControllerQueryKernelConfig
private boolean durableStorage; private boolean durableStorage;
private boolean faultTolerant; private boolean faultTolerant;
private MSQDestination destination; private MSQDestination destination;
private String controllerId; private String controllerHost;
private List<String> workerIds; private List<String> workerIds;
private Map<String, Object> workerContextMap = Collections.emptyMap();
/** /**
* Use {@link #builder()}. * Use {@link #builder()}.
@ -231,9 +250,9 @@ public class ControllerQueryKernelConfig
return this; return this;
} }
public Builder controllerId(final String controllerId) public Builder controllerHost(final String controllerHost)
{ {
this.controllerId = controllerId; this.controllerHost = controllerHost;
return this; return this;
} }
@ -243,6 +262,12 @@ public class ControllerQueryKernelConfig
return this; return this;
} }
public Builder workerContextMap(final Map<String, Object> workerContextMap)
{
this.workerContextMap = workerContextMap;
return this;
}
public ControllerQueryKernelConfig build() public ControllerQueryKernelConfig build()
{ {
return new ControllerQueryKernelConfig( return new ControllerQueryKernelConfig(
@ -252,8 +277,9 @@ public class ControllerQueryKernelConfig
durableStorage, durableStorage,
faultTolerant, faultTolerant,
destination, destination,
controllerId, controllerHost,
workerIds workerIds,
workerContextMap
); );
} }
} }

View File

@ -119,7 +119,6 @@ public class MultiStageQueryContext
public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = SegmentSource.NONE; public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = SegmentSource.NONE;
public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages"; public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages";
public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage";
private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false; private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false;
public static final String CTX_SELECT_DESTINATION = "selectDestination"; public static final String CTX_SELECT_DESTINATION = "selectDestination";
@ -206,11 +205,14 @@ public class MultiStageQueryContext
); );
} }
public static int getMaxConcurrentStages(final QueryContext queryContext) public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages
)
{ {
return queryContext.getInt( return queryContext.getInt(
CTX_MAX_CONCURRENT_STAGES, CTX_MAX_CONCURRENT_STAGES,
DEFAULT_MAX_CONCURRENT_STAGES defaultMaxConcurrentStages
); );
} }
@ -336,16 +338,6 @@ public class MultiStageQueryContext
); );
} }
@Nullable
public static MSQSelectDestination getSelectDestinationOrNull(final QueryContext queryContext)
{
return QueryContexts.getAsEnum(
CTX_SELECT_DESTINATION,
queryContext.getString(CTX_SELECT_DESTINATION),
MSQSelectDestination.class
);
}
public static int getRowsInMemory(final QueryContext queryContext) public static int getRowsInMemory(final QueryContext queryContext)
{ {
return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY); return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY);

View File

@ -108,6 +108,7 @@ public class QueryValidatorTest
Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles
null, null,
null, null,
null,
null null
); );
@ -125,7 +126,7 @@ public class QueryValidatorTest
QueryValidator.validateWorkOrder(workOrder); QueryValidator.validateWorkOrder(workOrder);
} }
private static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers)
{ {
QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString()); QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString());

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
public class WorkerImplTest
{
@Test
public void test_makeWorkOrderToUse_nothingMissing()
{
final WorkOrder workOrder = new WorkOrder(
QueryValidatorTest.createQueryDefinition(10, 2),
0,
0,
Collections.singletonList(() -> 1),
null,
null,
OutputChannelMode.MEMORY,
ImmutableMap.of("foo", "bar")
);
Assert.assertSame(
workOrder,
WorkerImpl.makeWorkOrderToUse(
workOrder,
QueryContext.of(ImmutableMap.of("foo", "baz")) /* Conflicts with workOrder context; should be ignored */
)
);
}
@Test
public void test_makeWorkOrderToUse_missingOutputChannelModeAndWorkerContext()
{
final Map<String, Object> taskContext =
ImmutableMap.of("foo", "bar", MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true);
final WorkOrder workOrder = new WorkOrder(
QueryValidatorTest.createQueryDefinition(10, 2),
1,
2,
Collections.singletonList(() -> 1),
null,
null,
null,
null
);
Assert.assertEquals(
new WorkOrder(
workOrder.getQueryDefinition(),
workOrder.getStageNumber(),
workOrder.getWorkerNumber(),
workOrder.getInputs(),
null,
null,
OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE,
taskContext
),
WorkerImpl.makeWorkOrderToUse(workOrder, QueryContext.of(taskContext))
);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.kernel.controller;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class ControllerQueryKernelConfigTest
{
@Test
public void testBuilder()
{
int maxRetainedPartitionSketchBytes = 1;
int maxConcurrentStages = 2;
boolean pipeline = false;
boolean durableStorage = true;
boolean faultTolerance = true;
MSQDestination destination = DurableStorageMSQDestination.instance();
String controllerHost = "controllerHost";
List<String> workerIds = ImmutableList.of("worker1", "worker2");
Map<String, Object> workerContextMap = ImmutableMap.of("foo", "bar");
final ControllerQueryKernelConfig config1 = new ControllerQueryKernelConfig(
maxRetainedPartitionSketchBytes,
maxConcurrentStages,
pipeline,
durableStorage,
faultTolerance,
destination,
controllerHost,
workerIds,
workerContextMap
);
final ControllerQueryKernelConfig config2 = ControllerQueryKernelConfig
.builder()
.maxRetainedPartitionSketchBytes(maxRetainedPartitionSketchBytes)
.maxConcurrentStages(maxConcurrentStages)
.pipeline(pipeline)
.durableStorage(durableStorage)
.faultTolerance(faultTolerance)
.destination(destination)
.controllerHost(controllerHost)
.workerIds(workerIds)
.workerContextMap(workerContextMap)
.build();
Assert.assertEquals(config1, config2);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(ControllerQueryKernelConfig.class)
.usingGetClass()
.verify();
}
}

View File

@ -59,7 +59,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContext;
@ -269,7 +268,7 @@ public class MSQTestControllerContext implements ControllerContext
}; };
@Override @Override
public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef) public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec)
{ {
return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000)); return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000));
} }