diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index bc449d14120..44b22af3666 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -28,7 +28,6 @@ import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; -import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -43,7 +42,7 @@ public interface ControllerContext /** * Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}. */ - ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef); + ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec); /** * Callback from the controller implementation to "register" the controller. Used in the indexing task implementation @@ -88,7 +87,7 @@ public interface ControllerContext * * @param queryId query ID * @param querySpec query spec - * @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)} + * @param queryKernelConfig config from {@link #queryKernelConfig(String, MSQSpec)} * @param workerFailureListener listener that receives callbacks when workers fail */ WorkerManager newWorkerManager( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 2a29d40b9fe..8eda56ad857 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -562,8 +562,8 @@ public class ControllerImpl implements Controller private QueryDefinition initializeQueryDefAndState(final Closer closer) { this.selfDruidNode = context.selfNode(); - this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient()); - closer.register(netClient); + this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient())); + this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec); final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( @@ -594,7 +594,6 @@ public class ControllerImpl implements Controller QueryValidator.validateQueryDef(queryDef); queryDefRef.set(queryDef); - queryKernelConfig = context.queryKernelConfig(querySpec, queryDef); workerManager = context.newWorkerManager( queryId, querySpec, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 74e3850c6e9..702302f7ea1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -48,7 +48,6 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree; import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.MSQWorkerTask; -import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.MSQErrorReport; @@ -388,7 +387,6 @@ public class WorkerImpl implements Worker final InputChannelFactory inputChannelFactory = makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser); - final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty(); final boolean includeAllCounters = context.includeAllCounters(); final RunWorkOrder runWorkOrder = new RunWorkOrder( workOrder, @@ -402,8 +400,8 @@ public class WorkerImpl implements Worker context, frameContext, makeRunWorkOrderListener(workOrder, controllerClient, criticalWarningCodes, maxVerboseParseExceptions), - MultiStageQueryContext.isReindex(queryContext), - MultiStageQueryContext.removeNullBytes(queryContext) + MultiStageQueryContext.isReindex(workOrder.getWorkerContext()), + MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext()) ); // Set up processorCloser (called when processing is done). @@ -560,6 +558,13 @@ public class WorkerImpl implements Worker return getOrCreateStageOutputHolder(stageId, partitionNumber).readRemotelyFrom(offset); } + /** + * Accept a new {@link WorkOrder} for execution. + * + * For backwards-compatibility purposes, this method populates {@link WorkOrder#getOutputChannelMode()} + * and {@link WorkOrder#getWorkerContext()} if the controller did not set them. (They are there for newer controllers, + * but not older ones.) + */ @Override public void postWorkOrder(final WorkOrder workOrder) { @@ -577,28 +582,11 @@ public class WorkerImpl implements Worker ); } - final OutputChannelMode outputChannelMode; + final WorkOrder workOrderToUse = makeWorkOrderToUse( + workOrder, + task != null && task.getContext() != null ? QueryContext.of(task.getContext()) : QueryContext.empty() + ); - // This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder. - // (It will be there for newer controllers; this is a backwards-compatibility thing.) - if (workOrder.hasOutputChannelMode()) { - outputChannelMode = workOrder.getOutputChannelMode(); - } else { - final MSQSelectDestination selectDestination = - task != null - ? MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext())) - : MSQSelectDestination.TASKREPORT; - - outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode( - workOrder.getQueryDefinition(), - workOrder.getStageNumber(), - selectDestination, - task != null && MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())), - false - ); - } - - final WorkOrder workOrderToUse = workOrder.withOutputChannelMode(outputChannelMode); kernelManipulationQueue.add( kernelHolders -> kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse)) @@ -1009,6 +997,48 @@ public class WorkerImpl implements Worker ); } + /** + * Returns a work order based on the provided "originalWorkOrder", but where {@link WorkOrder#hasOutputChannelMode()} + * and {@link WorkOrder#hasWorkerContext()} are both true. If the original work order didn't have those fields, they + * are populated from the "taskContext". Otherwise the "taskContext" is ignored. + * + * This method can be removed once we can rely on these fields always being set in the WorkOrder. + * (They will be there for newer controllers; this is a backwards-compatibility method.) + * + * @param originalWorkOrder work order from controller + * @param taskContext task context + */ + static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder, @Nullable final QueryContext taskContext) + { + // This condition can be removed once we can rely on QueryContext always being in the WorkOrder. + // (It will be there for newer controllers; this is a backwards-compatibility thing.) + final QueryContext queryContext; + if (originalWorkOrder.hasWorkerContext()) { + queryContext = originalWorkOrder.getWorkerContext(); + } else if (taskContext != null) { + queryContext = taskContext; + } else { + queryContext = QueryContext.empty(); + } + + // This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder. + // (It will be there for newer controllers; this is a backwards-compatibility thing.) + final OutputChannelMode outputChannelMode; + if (originalWorkOrder.hasOutputChannelMode()) { + outputChannelMode = originalWorkOrder.getOutputChannelMode(); + } else { + outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode( + originalWorkOrder.getQueryDefinition(), + originalWorkOrder.getStageNumber(), + MultiStageQueryContext.getSelectDestination(queryContext), + MultiStageQueryContext.isDurableStorageEnabled(queryContext), + false + ); + } + + return originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode); + } + /** * Log (at DEBUG level) a string explaining the status of all work assigned to this worker. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 42808f64742..589b17d632b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -46,7 +46,7 @@ import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpecSlicer; -import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidMetrics; @@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit; */ public class IndexerControllerContext implements ControllerContext { + public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1; + private static final Logger log = new Logger(IndexerControllerContext.class); private final MSQControllerTask task; @@ -96,21 +98,21 @@ public class IndexerControllerContext implements ControllerContext @Override public ControllerQueryKernelConfig queryKernelConfig( - final MSQSpec querySpec, - final QueryDefinition queryDef + final String queryId, + final MSQSpec querySpec ) { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, - queryDef.getFinalStageDefinition().getMaxWorkerCount() + querySpec.getTuningConfig().getMaxNumWorkers() ); final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters); log.debug( "Query[%s] using %s[%s], %s[%s], %s[%s].", - queryDef.getQueryId(), + queryId, MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, config.isDurableStorage(), MultiStageQueryContext.CTX_FAULT_TOLERANCE, @@ -210,7 +212,7 @@ public class IndexerControllerContext implements ControllerContext } /** - * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests. + * Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used in tests. */ public static ControllerQueryKernelConfig makeQueryKernelConfig( final MSQSpec querySpec, @@ -218,7 +220,8 @@ public class IndexerControllerContext implements ControllerContext ) { final QueryContext queryContext = querySpec.getQuery().context(); - final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); + final int maxConcurrentStages = + MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext, DEFAULT_MAX_CONCURRENT_STAGES); final boolean isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext); final boolean isDurableStorageEnabled; @@ -256,9 +259,44 @@ public class IndexerControllerContext implements ControllerContext .destination(querySpec.getDestination()) .maxConcurrentStages(maxConcurrentStages) .maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes()) + .workerContextMap(makeWorkerContextMap(querySpec, isDurableStorageEnabled, maxConcurrentStages)) .build(); } + /** + * Helper method for {@link #makeQueryKernelConfig} and {@link #makeTaskContext}. Makes the worker context map, + * i.e., the map that will become {@link WorkOrder#getWorkerContext()}. + */ + public static Map makeWorkerContextMap( + final MSQSpec querySpec, + final boolean durableStorageEnabled, + final int maxConcurrentStages + ) + { + final QueryContext queryContext = querySpec.getQuery().context(); + final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext); + final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(queryContext); + final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final ImmutableMap.Builder builder = ImmutableMap.builder(); + + builder + .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, durableStorageEnabled) + .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) + .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) + .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, maxConcurrentStages) + .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) + .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); + + if (querySpec.getDestination().toSelectDestination() != null) { + builder.put( + MultiStageQueryContext.CTX_SELECT_DESTINATION, + querySpec.getDestination().toSelectDestination().getName() + ); + } + + return builder.build(); + } + /** * Helper method for {@link #newWorkerManager}, split out to be used in tests. * @@ -271,17 +309,16 @@ public class IndexerControllerContext implements ControllerContext ) { final ImmutableMap.Builder taskContextOverridesBuilder = ImmutableMap.builder(); - final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); - final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()); - final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context()); - taskContextOverridesBuilder - .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage()) - .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) - .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) - .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()) - .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) - .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); + // Put worker context into the task context. That way, workers can get these context keys either from + // WorkOrder#getContext or Task#getContext. + taskContextOverridesBuilder.putAll( + makeWorkerContextMap( + querySpec, + queryKernelConfig.isDurableStorage(), + queryKernelConfig.getMaxConcurrentStages() + ) + ); // Put the lookup loading info in the task context to facilitate selective loading of lookups. if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { @@ -297,13 +334,6 @@ public class IndexerControllerContext implements ControllerContext ); } - if (querySpec.getDestination().toSelectDestination() != null) { - taskContextOverridesBuilder.put( - MultiStageQueryContext.CTX_SELECT_DESTINATION, - querySpec.getDestination().toSelectDestination().getName() - ); - } - // propagate the controller's tags to the worker task for enhanced metrics reporting @SuppressWarnings("unchecked") Map tags = (Map) controllerTaskContext.get(DruidMetrics.TAGS); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 2a7d91c40af..fbb0bff9556 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -116,7 +116,10 @@ public class IndexerWorkerContext implements WorkerContext this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; final QueryContext queryContext = QueryContext.of(task.getContext()); - this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); + this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault( + queryContext, + IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES + ); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java index 0c857870210..2a45605826b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java @@ -26,9 +26,11 @@ import com.google.common.base.Preconditions; import org.apache.druid.msq.exec.ControllerClient; import org.apache.druid.msq.exec.OutputChannelMode; import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.query.QueryContext; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -51,9 +53,18 @@ public class WorkOrder @Nullable private final List workerIds; + /** + * Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons. + */ @Nullable private final OutputChannelMode outputChannelMode; + /** + * Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons. + */ + @Nullable + private final QueryContext workerContext; + @JsonCreator @SuppressWarnings("rawtypes") public WorkOrder( @@ -63,7 +74,8 @@ public class WorkOrder @JsonProperty("input") final List workerInputs, @JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder, @JsonProperty("workers") @Nullable final List workerIds, - @JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode + @JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode, + @JsonProperty("context") @Nullable final Map workerContext ) { this.queryDefinition = Preconditions.checkNotNull(queryDefinition, "queryDefinition"); @@ -73,6 +85,7 @@ public class WorkOrder this.extraInfoHolder = extraInfoHolder; this.workerIds = workerIds; this.outputChannelMode = outputChannelMode; + this.workerContext = workerContext != null ? QueryContext.of(workerContext) : null; } @JsonProperty("query") @@ -124,6 +137,10 @@ public class WorkOrder return outputChannelMode != null; } + /** + * Retrieves the output channel mode set by the controller. Null means the controller didn't set it, which means + * we're dealing with an older controller. + */ @Nullable @JsonProperty("output") @JsonInclude(JsonInclude.Include.NON_NULL) @@ -132,6 +149,29 @@ public class WorkOrder return outputChannelMode; } + public boolean hasWorkerContext() + { + return workerContext != null; + } + + /** + * Retrieves the query context set by the controller. Null means the controller didn't set it, which means + * we're dealing with an older controller. + */ + @Nullable + public QueryContext getWorkerContext() + { + return workerContext; + } + + @Nullable + @JsonProperty("context") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getContextForSerialization() + { + return workerContext != null ? workerContext.asMap() : null; + } + @Nullable public Object getExtraInfo() { @@ -155,7 +195,26 @@ public class WorkOrder workerInputs, extraInfoHolder, workerIds, - newOutputChannelMode + newOutputChannelMode, + workerContext != null ? workerContext.asMap() : null + ); + } + } + + public WorkOrder withWorkerContext(final QueryContext newContext) + { + if (Objects.equals(newContext, this.workerContext)) { + return this; + } else { + return new WorkOrder( + queryDefinition, + stageNumber, + workerNumber, + workerInputs, + extraInfoHolder, + workerIds, + outputChannelMode, + newContext.asMap() ); } } @@ -176,7 +235,8 @@ public class WorkOrder && Objects.equals(workerInputs, workOrder.workerInputs) && Objects.equals(extraInfoHolder, workOrder.extraInfoHolder) && Objects.equals(workerIds, workOrder.workerIds) - && Objects.equals(outputChannelMode, workOrder.outputChannelMode); + && Objects.equals(outputChannelMode, workOrder.outputChannelMode) + && Objects.equals(workerContext, workOrder.workerContext); } @Override @@ -189,7 +249,8 @@ public class WorkOrder workerInputs, extraInfoHolder, workerIds, - outputChannelMode + outputChannelMode, + workerContext ); } @@ -204,6 +265,7 @@ public class WorkOrder ", extraInfoHolder=" + extraInfoHolder + ", workerIds=" + workerIds + ", outputChannelMode=" + outputChannelMode + + ", context=" + workerContext + '}'; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index b01091f9ad7..62a13326909 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -302,7 +302,8 @@ public class ControllerQueryKernel workerInputs.inputsForWorker(workerNumber), extraInfoHolder, config.getWorkerIds(), - outputChannelMode + outputChannelMode, + config.getWorkerContextMap() ); QueryValidator.validateWorkOrder(workOrder); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java index 5c754aedd4f..f7516c63c92 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java @@ -21,9 +21,12 @@ package org.apache.druid.msq.kernel.controller; import org.apache.druid.java.util.common.IAE; import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.apache.druid.msq.kernel.WorkOrder; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -37,22 +40,22 @@ public class ControllerQueryKernelConfig private final boolean durableStorage; private final boolean faultTolerance; private final MSQDestination destination; - @Nullable - private final String controllerId; - + private final String controllerHost; @Nullable private final List workerIds; + private final Map workerContextMap; - private ControllerQueryKernelConfig( + ControllerQueryKernelConfig( int maxRetainedPartitionSketchBytes, int maxConcurrentStages, boolean pipeline, boolean durableStorage, boolean faultTolerance, MSQDestination destination, - @Nullable String controllerId, - @Nullable List workerIds + @Nullable String controllerHost, + @Nullable List workerIds, + Map workerContextMap ) { if (maxRetainedPartitionSketchBytes <= 0) { @@ -85,8 +88,9 @@ public class ControllerQueryKernelConfig this.durableStorage = durableStorage; this.faultTolerance = faultTolerance; this.destination = destination; - this.controllerId = controllerId; + this.controllerHost = controllerHost; this.workerIds = workerIds; + this.workerContextMap = workerContextMap; } public static Builder builder() @@ -130,6 +134,14 @@ public class ControllerQueryKernelConfig return workerIds; } + /** + * Map to include in {@link WorkOrder}, as {@link WorkOrder#getWorkerContext()}. + */ + public Map getWorkerContextMap() + { + return workerContextMap; + } + @Override public boolean equals(Object o) { @@ -145,8 +157,10 @@ public class ControllerQueryKernelConfig && pipeline == that.pipeline && durableStorage == that.durableStorage && faultTolerance == that.faultTolerance - && Objects.equals(controllerId, that.controllerId) - && Objects.equals(workerIds, that.workerIds); + && Objects.equals(destination, that.destination) + && Objects.equals(controllerHost, that.controllerHost) + && Objects.equals(workerIds, that.workerIds) + && Objects.equals(workerContextMap, that.workerContextMap); } @Override @@ -158,8 +172,10 @@ public class ControllerQueryKernelConfig pipeline, durableStorage, faultTolerance, - controllerId, - workerIds + destination, + controllerHost, + workerIds, + workerContextMap ); } @@ -171,9 +187,11 @@ public class ControllerQueryKernelConfig ", maxConcurrentStages=" + maxConcurrentStages + ", pipeline=" + pipeline + ", durableStorage=" + durableStorage + - ", faultTolerant=" + faultTolerance + - ", controllerId='" + controllerId + '\'' + + ", faultTolerance=" + faultTolerance + + ", destination=" + destination + + ", controllerHost='" + controllerHost + '\'' + ", workerIds=" + workerIds + + ", workerContextMap=" + workerContextMap + '}'; } @@ -185,8 +203,9 @@ public class ControllerQueryKernelConfig private boolean durableStorage; private boolean faultTolerant; private MSQDestination destination; - private String controllerId; + private String controllerHost; private List workerIds; + private Map workerContextMap = Collections.emptyMap(); /** * Use {@link #builder()}. @@ -231,9 +250,9 @@ public class ControllerQueryKernelConfig return this; } - public Builder controllerId(final String controllerId) + public Builder controllerHost(final String controllerHost) { - this.controllerId = controllerId; + this.controllerHost = controllerHost; return this; } @@ -243,6 +262,12 @@ public class ControllerQueryKernelConfig return this; } + public Builder workerContextMap(final Map workerContextMap) + { + this.workerContextMap = workerContextMap; + return this; + } + public ControllerQueryKernelConfig build() { return new ControllerQueryKernelConfig( @@ -252,8 +277,9 @@ public class ControllerQueryKernelConfig durableStorage, faultTolerant, destination, - controllerId, - workerIds + controllerHost, + workerIds, + workerContextMap ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 63601c907a2..4ed98dca594 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -119,7 +119,6 @@ public class MultiStageQueryContext public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = SegmentSource.NONE; public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages"; - public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1; public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false; public static final String CTX_SELECT_DESTINATION = "selectDestination"; @@ -206,11 +205,14 @@ public class MultiStageQueryContext ); } - public static int getMaxConcurrentStages(final QueryContext queryContext) + public static int getMaxConcurrentStagesWithDefault( + final QueryContext queryContext, + final int defaultMaxConcurrentStages + ) { return queryContext.getInt( CTX_MAX_CONCURRENT_STAGES, - DEFAULT_MAX_CONCURRENT_STAGES + defaultMaxConcurrentStages ); } @@ -336,16 +338,6 @@ public class MultiStageQueryContext ); } - @Nullable - public static MSQSelectDestination getSelectDestinationOrNull(final QueryContext queryContext) - { - return QueryContexts.getAsEnum( - CTX_SELECT_DESTINATION, - queryContext.getString(CTX_SELECT_DESTINATION), - MSQSelectDestination.class - ); - } - public static int getRowsInMemory(final QueryContext queryContext) { return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java index d7364124483..c1d1030fb08 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java @@ -108,6 +108,7 @@ public class QueryValidatorTest Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles null, null, + null, null ); @@ -125,7 +126,7 @@ public class QueryValidatorTest QueryValidator.validateWorkOrder(workOrder); } - private static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) + public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) { QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java new file mode 100644 index 00000000000..32cd36d0998 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +public class WorkerImplTest +{ + @Test + public void test_makeWorkOrderToUse_nothingMissing() + { + final WorkOrder workOrder = new WorkOrder( + QueryValidatorTest.createQueryDefinition(10, 2), + 0, + 0, + Collections.singletonList(() -> 1), + null, + null, + OutputChannelMode.MEMORY, + ImmutableMap.of("foo", "bar") + ); + + Assert.assertSame( + workOrder, + WorkerImpl.makeWorkOrderToUse( + workOrder, + QueryContext.of(ImmutableMap.of("foo", "baz")) /* Conflicts with workOrder context; should be ignored */ + ) + ); + } + + @Test + public void test_makeWorkOrderToUse_missingOutputChannelModeAndWorkerContext() + { + final Map taskContext = + ImmutableMap.of("foo", "bar", MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true); + + final WorkOrder workOrder = new WorkOrder( + QueryValidatorTest.createQueryDefinition(10, 2), + 1, + 2, + Collections.singletonList(() -> 1), + null, + null, + null, + null + ); + + Assert.assertEquals( + new WorkOrder( + workOrder.getQueryDefinition(), + workOrder.getStageNumber(), + workOrder.getWorkerNumber(), + workOrder.getInputs(), + null, + null, + OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE, + taskContext + ), + WorkerImpl.makeWorkOrderToUse(workOrder, QueryContext.of(taskContext)) + ); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java new file mode 100644 index 00000000000..765101359f6 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.kernel.controller; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; +import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ControllerQueryKernelConfigTest +{ + @Test + public void testBuilder() + { + int maxRetainedPartitionSketchBytes = 1; + int maxConcurrentStages = 2; + boolean pipeline = false; + boolean durableStorage = true; + boolean faultTolerance = true; + MSQDestination destination = DurableStorageMSQDestination.instance(); + String controllerHost = "controllerHost"; + List workerIds = ImmutableList.of("worker1", "worker2"); + Map workerContextMap = ImmutableMap.of("foo", "bar"); + + final ControllerQueryKernelConfig config1 = new ControllerQueryKernelConfig( + maxRetainedPartitionSketchBytes, + maxConcurrentStages, + pipeline, + durableStorage, + faultTolerance, + destination, + controllerHost, + workerIds, + workerContextMap + ); + + final ControllerQueryKernelConfig config2 = ControllerQueryKernelConfig + .builder() + .maxRetainedPartitionSketchBytes(maxRetainedPartitionSketchBytes) + .maxConcurrentStages(maxConcurrentStages) + .pipeline(pipeline) + .durableStorage(durableStorage) + .faultTolerance(faultTolerance) + .destination(destination) + .controllerHost(controllerHost) + .workerIds(workerIds) + .workerContextMap(workerContextMap) + .build(); + + Assert.assertEquals(config1, config2); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(ControllerQueryKernelConfig.class) + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 3034be39984..ed518afd2ef 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -59,7 +59,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpecSlicer; -import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -269,7 +268,7 @@ public class MSQTestControllerContext implements ControllerContext }; @Override - public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef) + public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec) { return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000)); }