Add configurable final stages to MSQ ingestion queries (#16699)

* Add a segmentMorphFactory to MSQ.

* Add test

* Make argument nullable

* Fix Guice issues

* Merge with master

* Remove extra information

* Fix tests

* Create a utils class

* Refactor segment generation

* Fix javadoc

* Refactor

* Refactor

* Fix injection
This commit is contained in:
Adarsh Sanjeev 2024-08-27 11:35:48 +05:30 committed by GitHub
parent 16517e348e
commit 3b88b57d70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 668 additions and 389 deletions

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArraySet;
@ -38,10 +37,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
@ -82,7 +78,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@ -102,6 +97,8 @@ import org.apache.druid.msq.indexing.WorkerCount;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec;
import org.apache.druid.msq.indexing.destination.TerminalStageSpec;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
@ -139,8 +136,6 @@ import org.apache.druid.msq.input.inline.InlineInputSpecSlicer;
import org.apache.druid.msq.input.lookup.LookupInputSpec;
import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer;
import org.apache.druid.msq.input.stage.InputChannels;
import org.apache.druid.msq.input.stage.ReadablePartition;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpec;
@ -166,28 +161,21 @@ import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@ -195,7 +183,6 @@ import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.CompactionState;
@ -205,7 +192,6 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -218,7 +204,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -1192,11 +1177,16 @@ public class ControllerImpl implements Controller
{
if (MSQControllerTask.isIngestion(querySpec) &&
stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) {
// noinspection unchecked,rawtypes
return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate);
} else {
return null;
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec();
if (destination.getTerminalStageSpec() instanceof SegmentGenerationStageSpec) {
return (Int2ObjectMap) ((SegmentGenerationStageSpec) terminalStageSpec).getWorkerInfo(
workerInputs,
segmentsToGenerate
);
}
}
return null;
}
@SuppressWarnings("rawtypes")
@ -1212,35 +1202,6 @@ public class ControllerImpl implements Controller
return new MultiQueryKit(kitMap);
}
private Int2ObjectMap<List<SegmentIdWithShardSpec>> makeSegmentGeneratorWorkerFactoryInfos(
final WorkerInputs workerInputs,
final List<SegmentIdWithShardSpec> segmentsToGenerate
)
{
final Int2ObjectMap<List<SegmentIdWithShardSpec>> retVal = new Int2ObjectAVLTreeMap<>();
// Empty segments validation already happens when the stages are started -- so we cannot have both
// isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here.
if (segmentsToGenerate.isEmpty()) {
return retVal;
}
for (final int workerNumber : workerInputs.workers()) {
// SegmentGenerator stage has a single input from another stage.
final StageInputSlice stageInputSlice =
(StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber));
final List<SegmentIdWithShardSpec> workerSegments = new ArrayList<>();
retVal.put(workerNumber, workerSegments);
for (final ReadablePartition partition : stageInputSlice.getPartitions()) {
workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber()));
}
}
return retVal;
}
/**
* A blocking function used to contact multiple workers. Checks if all the workers are running before contacting them.
*
@ -1816,9 +1777,6 @@ public class ControllerImpl implements Controller
}
if (MSQControllerTask.isIngestion(querySpec)) {
final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature();
final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy();
// Find the stage that provides shuffled input to the final segment-generation stage.
StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition();
@ -1845,24 +1803,15 @@ public class ControllerImpl implements Controller
}
}
// Then, add a segment-generation stage.
final DataSchema dataSchema =
makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.processorFactory(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
columnMappings,
tuningConfig
)
)
);
return builder.build();
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
return builder.add(
destination.getTerminalStageSpec()
.constructFinalStage(
queryDef,
querySpec,
jsonMapper)
)
.build();
} else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) {
return queryDef;
} else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) {
@ -1933,112 +1882,6 @@ public class ControllerImpl implements Controller
return ((DataSourceMSQDestination) querySpec.getDestination()).getDataSource();
}
private static DataSchema makeDataSchemaForIngestion(
MSQSpec querySpec,
RowSignature querySignature,
ClusterBy queryClusterBy,
ColumnMappings columnMappings,
ObjectMapper jsonMapper
)
{
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
final boolean isRollupQuery = isRollupQuery(querySpec.getQuery());
final boolean forceSegmentSortByTime =
MultiStageQueryContext.isForceSegmentSortByTime(querySpec.getQuery().context());
final Pair<DimensionsSpec, List<AggregatorFactory>> dimensionsAndAggregators =
makeDimensionsAndAggregatorsForIngestion(
querySignature,
queryClusterBy,
destination.getSegmentSortOrder(),
forceSegmentSortByTime,
columnMappings,
isRollupQuery,
querySpec.getQuery(),
destination.getDimensionSchemas()
);
return new DataSchema(
destination.getDataSource(),
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
dimensionsAndAggregators.lhs,
dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]),
makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper),
new TransformSpec(null, Collections.emptyList())
);
}
private static GranularitySpec makeGranularitySpecForIngestion(
final Query<?> query,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final ObjectMapper jsonMapper
)
{
if (isRollupQuery) {
final String queryGranularityString =
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, "");
if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) {
final Granularity queryGranularity;
try {
queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY);
}
return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY);
} else {
return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY);
}
}
/**
* Checks that a {@link GroupByQuery} is grouping on the primary time column.
* <p>
* The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the
* output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the
* presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter
* {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions
* is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field
* from {@link ColumnMappings}, are the same.
*/
private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings)
{
final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME);
if (positions.size() == 1) {
final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0));
return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD));
} else {
return false;
}
}
/**
* Whether a native query represents an ingestion with rollup.
* <p>
* Checks for three things:
* <p>
* - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and
* aggregations.
* - The query must not finalize aggregations, because rollup requires inserting the intermediate type of
* complex aggregations, not the finalized type. (So further rollup is possible.)
* - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because
* groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time
* (rollup expects multi-value dimensions to be treated as arrays).
*/
private static boolean isRollupQuery(Query<?> query)
{
return query instanceof GroupByQuery
&& !MultiStageQueryContext.isFinalizeAggregations(query.context())
&& !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true);
}
/**
* Compute shard columns for {@link DimensionRangeShardSpec}. Returns an empty list if range-based sharding
* is not applicable.
@ -2147,197 +1990,6 @@ public class ControllerImpl implements Controller
return new StringTuple(array);
}
private static DimensionSchema getDimensionSchema(
final String outputColumnName,
@Nullable final ColumnType queryType,
QueryContext context,
@Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) {
return dimensionSchemas.get(outputColumnName);
}
// In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollu
// we won't have an entry in the map. For those cases, use the default config.
return DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
queryType,
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.getArrayIngestMode(context)
);
}
private static Pair<DimensionsSpec, List<AggregatorFactory>> makeDimensionsAndAggregatorsForIngestion(
final RowSignature querySignature,
final ClusterBy queryClusterBy,
final List<String> contextSegmentSortOrder,
final boolean forceSegmentSortByTime,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final Query<?> query,
@Nullable final Map<String, DimensionSchema> dimensionSchemas
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
// deprecation and removal in future
if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) {
log.warn(
"%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as "
+ "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be "
+ "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer "
+ "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write "
+ "out multi-value string dimensions using ARRAY_TO_MV. "
+ "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
}
final List<DimensionSchema> dimensions = new ArrayList<>();
final List<AggregatorFactory> aggregators = new ArrayList<>();
// During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want
// this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in
// that order.
// Start with segmentSortOrder.
final Set<String> outputColumnsInOrder = new LinkedHashSet<>(contextSegmentSortOrder);
// Then __time, if it's an output column and forceSegmentSortByTime is set.
if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME) && forceSegmentSortByTime) {
outputColumnsInOrder.add(ColumnHolder.TIME_COLUMN_NAME);
}
// Then the query-level CLUSTERED BY.
// Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected.
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment.
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
for (final int outputColumn : outputColumns) {
outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));
}
}
// Then all other columns.
outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames());
Map<String, AggregatorFactory> outputColumnAggregatorFactories = new HashMap<>();
if (isRollupQuery) {
// Populate aggregators from the native query when doing an ingest in rollup mode.
for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) {
for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) {
final String outputColumnName = columnMappings.getOutputColumnName(outputColumn);
if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
throw new ISE("There can only be one aggregation for column [%s].", outputColumn);
} else {
outputColumnAggregatorFactories.put(
outputColumnName,
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
);
}
}
}
}
// Each column can be either a dimension or an aggregator.
// For non-complex columns, If the aggregator factory of the column is not available, we treat the column as
// a dimension. For complex columns, certains hacks are in place.
for (final String outputColumnName : outputColumnsInOrder) {
// CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require
// that output names be unique.
final int outputColumn = CollectionUtils.getOnlyElement(
columnMappings.getOutputColumnsByName(outputColumnName),
xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs)
);
final String queryColumn = columnMappings.getQueryColumnName(outputColumn);
final ColumnType type =
querySignature.getColumnType(queryColumn)
.orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName));
if (!type.is(ValueType.COMPLEX)) {
// non complex columns
populateDimensionsAndAggregators(
dimensions,
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context(),
dimensionSchemas
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(
getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
} else {
populateDimensionsAndAggregators(
dimensions,
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context(),
dimensionSchemas
);
}
}
}
final DimensionsSpec.Builder dimensionsSpecBuilder = DimensionsSpec.builder();
if (!dimensions.isEmpty() && dimensions.get(0).getName().equals(ColumnHolder.TIME_COLUMN_NAME)) {
// Skip __time if it's in the first position, for compatibility with legacy dimensionSpecs.
dimensions.remove(0);
dimensionsSpecBuilder.setForceSegmentSortByTime(null);
} else {
// Store explicit forceSegmentSortByTime only if false, for compatibility with legacy dimensionSpecs.
dimensionsSpecBuilder.setForceSegmentSortByTime(forceSegmentSortByTime ? null : false);
}
return Pair.of(dimensionsSpecBuilder.setDimensions(dimensions).build(), aggregators);
}
/**
* If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column.
* else treat this column as a dimension.
*
* @param dimensions list is poulated if the output col is deemed to be a dimension
* @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column.
* @param outputColumnAggregatorFactories output col -> AggregatorFactory map
* @param outputColumn column name
* @param type columnType
*/
private static void populateDimensionsAndAggregators(
List<DimensionSchema> dimensions,
List<AggregatorFactory> aggregators,
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type,
QueryContext context,
Map<String, DimensionSchema> dimensionSchemas
)
{
if (ColumnHolder.TIME_COLUMN_NAME.equals(outputColumn)) {
if (!type.is(ValueType.LONG)) {
throw DruidException.defensive("Incorrect type[%s] for column[%s]", type, outputColumn);
}
dimensions.add(new LongDimensionSchema(outputColumn));
} else if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
getDimensionSchema(outputColumn, type, context, dimensionSchemas)
);
}
}
private static DateTime getBucketDateTime(
final ClusterByPartition partitionBoundary,
final Granularity segmentGranularity,
@ -2815,7 +2467,8 @@ public class ControllerImpl implements Controller
for (final StageId stageId : newStageIds) {
// Allocate segments, if this is the final stage of an ingestion.
if (MSQControllerTask.isIngestion(querySpec)
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()) {
&& stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()
&& (((DataSourceMSQDestination) querySpec.getDestination()).getTerminalStageSpec() instanceof SegmentGenerationStageSpec)) {
populateSegmentsToGenerate();
}

View File

@ -54,6 +54,8 @@ public class MSQSqlModule implements DruidModule
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.bind(MSQTerminalStageSpecFactory.class).toInstance(new MSQTerminalStageSpecFactory());
binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class);
// Set up the EXTERN macro.

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.guice;
import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec;
import org.apache.druid.msq.indexing.destination.TerminalStageSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;
public class MSQTerminalStageSpecFactory
{
/**
* Creates a {@link TerminalStageSpec} which determines the final of a query. Currently, always returns a segment
* generation spec, but this can be used to configure a wide range of behaviours.
*/
public TerminalStageSpec createTerminalStageSpec(DruidQuery druidQuery, PlannerContext plannerContext)
{
return SegmentGenerationStageSpec.instance();
}
}

View File

@ -243,7 +243,8 @@ public class MSQCompactionRunner implements CompactionRunner
dataSchema.getDimensionsSpec()
.getDimensions()
.stream()
.collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
.collect(Collectors.toMap(DimensionSchema::getName, Function.identity())),
null
);
}

View File

@ -51,6 +51,8 @@ public class DataSourceMSQDestination implements MSQDestination
@Nullable
private final List<Interval> replaceTimeChunks;
private final TerminalStageSpec terminalStageSpec;
@Nullable
private final Map<String, DimensionSchema> dimensionSchemas;
@ -60,7 +62,8 @@ public class DataSourceMSQDestination implements MSQDestination
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("dimensionSchemas") @Nullable Map<String, DimensionSchema> dimensionSchemas
@JsonProperty("dimensionSchemas") @Nullable Map<String, DimensionSchema> dimensionSchemas,
@JsonProperty("terminalStageSpec") @Nullable TerminalStageSpec terminalStageSpec
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@ -68,6 +71,7 @@ public class DataSourceMSQDestination implements MSQDestination
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.dimensionSchemas = dimensionSchemas;
this.terminalStageSpec = terminalStageSpec != null ? terminalStageSpec : SegmentGenerationStageSpec.instance();
if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
@ -105,6 +109,17 @@ public class DataSourceMSQDestination implements MSQDestination
return dataSource;
}
/**
* Returns the terminal stage spec.
* <p>
* The terminal stage spec, is a way to tell the MSQ task how to convert the results into segments at the final stage.
*/
@JsonProperty
public TerminalStageSpec getTerminalStageSpec()
{
return terminalStageSpec;
}
@JsonProperty
public Granularity getSegmentGranularity()
{
@ -177,13 +192,14 @@ public class DataSourceMSQDestination implements MSQDestination
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(dimensionSchemas, that.dimensionSchemas);
&& Objects.equals(dimensionSchemas, that.dimensionSchemas)
&& Objects.equals(terminalStageSpec, that.terminalStageSpec);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas, terminalStageSpec);
}
@Override
@ -195,6 +211,7 @@ public class DataSourceMSQDestination implements MSQDestination
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
", dimensionSchemas=" + dimensionSchemas +
", terminalStageSpec=" + terminalStageSpec +
'}';
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.destination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.input.stage.ReadablePartition;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.kernel.controller.WorkerInputs;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class SegmentGenerationStageSpec implements TerminalStageSpec
{
public static final String TYPE = "segmentGeneration";
private static final SegmentGenerationStageSpec INSTANCE = new SegmentGenerationStageSpec();
private SegmentGenerationStageSpec()
{
}
@JsonCreator
public static SegmentGenerationStageSpec instance()
{
return INSTANCE;
}
@Override
public StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
final ColumnMappings columnMappings = querySpec.getColumnMappings();
final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature();
final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy();
// Add a segment-generation stage.
final DataSchema dataSchema =
SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
return StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.processorFactory(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
columnMappings,
tuningConfig
)
);
}
public Int2ObjectMap<List<SegmentIdWithShardSpec>> getWorkerInfo(
final WorkerInputs workerInputs,
@Nullable final List<SegmentIdWithShardSpec> segmentsToGenerate
)
{
final Int2ObjectMap<List<SegmentIdWithShardSpec>> retVal = new Int2ObjectAVLTreeMap<>();
// Empty segments validation already happens when the stages are started -- so we cannot have both
// isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here.
if (segmentsToGenerate == null || segmentsToGenerate.isEmpty()) {
return retVal;
}
for (final int workerNumber : workerInputs.workers()) {
// SegmentGenerator stage has a single input from another stage.
final StageInputSlice stageInputSlice =
(StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber));
final List<SegmentIdWithShardSpec> workerSegments = new ArrayList<>();
retVal.put(workerNumber, workerSegments);
for (final ReadablePartition partition : stageInputSlice.getPartitions()) {
workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber()));
}
}
return retVal;
}
}

View File

@ -0,0 +1,374 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.destination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public final class SegmentGenerationUtils
{
private static final Logger log = new Logger(SegmentGenerationUtils.class);
public static DataSchema makeDataSchemaForIngestion(
MSQSpec querySpec,
RowSignature querySignature,
ClusterBy queryClusterBy,
ColumnMappings columnMappings,
ObjectMapper jsonMapper
)
{
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
final boolean isRollupQuery = isRollupQuery(querySpec.getQuery());
final boolean forceSegmentSortByTime =
MultiStageQueryContext.isForceSegmentSortByTime(querySpec.getQuery().context());
final Pair<DimensionsSpec, List<AggregatorFactory>> dimensionsAndAggregators =
makeDimensionsAndAggregatorsForIngestion(
querySignature,
queryClusterBy,
destination.getSegmentSortOrder(),
forceSegmentSortByTime,
columnMappings,
isRollupQuery,
querySpec.getQuery(),
destination.getDimensionSchemas()
);
return new DataSchema(
destination.getDataSource(),
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
dimensionsAndAggregators.lhs,
dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]),
makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper),
new TransformSpec(null, Collections.emptyList())
);
}
private static GranularitySpec makeGranularitySpecForIngestion(
final Query<?> query,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final ObjectMapper jsonMapper
)
{
if (isRollupQuery) {
final String queryGranularityString =
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, "");
if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) {
final Granularity queryGranularity;
try {
queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY);
}
return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY);
} else {
return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY);
}
}
/**
* Checks that a {@link GroupByQuery} is grouping on the primary time column.
* <p>
* The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the
* output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the
* presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter
* {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions
* is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field
* from {@link ColumnMappings}, are the same.
*/
private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings)
{
final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME);
if (positions.size() == 1) {
final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0));
return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD));
} else {
return false;
}
}
/**
* Whether a native query represents an ingestion with rollup.
* <p>
* Checks for three things:
* <p>
* - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and
* aggregations.
* - The query must not finalize aggregations, because rollup requires inserting the intermediate type of
* complex aggregations, not the finalized type. (So further rollup is possible.)
* - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because
* groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time
* (rollup expects multi-value dimensions to be treated as arrays).
*/
private static boolean isRollupQuery(Query<?> query)
{
return query instanceof GroupByQuery
&& !MultiStageQueryContext.isFinalizeAggregations(query.context())
&& !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true);
}
private static DimensionSchema getDimensionSchema(
final String outputColumnName,
@Nullable final ColumnType queryType,
QueryContext context,
@Nullable Map<String, DimensionSchema> dimensionSchemas
)
{
if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) {
return dimensionSchemas.get(outputColumnName);
}
// In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollu
// we won't have an entry in the map. For those cases, use the default config.
return DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
queryType,
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.getArrayIngestMode(context)
);
}
private static Pair<DimensionsSpec, List<AggregatorFactory>> makeDimensionsAndAggregatorsForIngestion(
final RowSignature querySignature,
final ClusterBy queryClusterBy,
final List<String> contextSegmentSortOrder,
final boolean forceSegmentSortByTime,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final Query<?> query,
@Nullable final Map<String, DimensionSchema> dimensionSchemas
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
// deprecation and removal in future
if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) {
log.warn(
"%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as "
+ "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be "
+ "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer "
+ "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write "
+ "out multi-value string dimensions using ARRAY_TO_MV. "
+ "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
}
final List<DimensionSchema> dimensions = new ArrayList<>();
final List<AggregatorFactory> aggregators = new ArrayList<>();
// During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want
// this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in
// that order.
// Start with segmentSortOrder.
final Set<String> outputColumnsInOrder = new LinkedHashSet<>(contextSegmentSortOrder);
// Then __time, if it's an output column and forceSegmentSortByTime is set.
if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME) && forceSegmentSortByTime) {
outputColumnsInOrder.add(ColumnHolder.TIME_COLUMN_NAME);
}
// Then the query-level CLUSTERED BY.
// Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected.
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment.
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
for (final int outputColumn : outputColumns) {
outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));
}
}
// Then all other columns.
outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames());
Map<String, AggregatorFactory> outputColumnAggregatorFactories = new HashMap<>();
if (isRollupQuery) {
// Populate aggregators from the native query when doing an ingest in rollup mode.
for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) {
for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) {
final String outputColumnName = columnMappings.getOutputColumnName(outputColumn);
if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
throw new ISE("There can only be one aggregation for column [%s].", outputColumn);
} else {
outputColumnAggregatorFactories.put(
outputColumnName,
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
);
}
}
}
}
// Each column can be either a dimension or an aggregator.
// For non-complex columns, If the aggregator factory of the column is not available, we treat the column as
// a dimension. For complex columns, certains hacks are in place.
for (final String outputColumnName : outputColumnsInOrder) {
// CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require
// that output names be unique.
final int outputColumn = CollectionUtils.getOnlyElement(
columnMappings.getOutputColumnsByName(outputColumnName),
xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs)
);
final String queryColumn = columnMappings.getQueryColumnName(outputColumn);
final ColumnType type =
querySignature.getColumnType(queryColumn)
.orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName));
if (!type.is(ValueType.COMPLEX)) {
// non complex columns
populateDimensionsAndAggregators(
dimensions,
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context(),
dimensionSchemas
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(
getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
} else {
populateDimensionsAndAggregators(
dimensions,
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context(),
dimensionSchemas
);
}
}
}
final DimensionsSpec.Builder dimensionsSpecBuilder = DimensionsSpec.builder();
if (!dimensions.isEmpty() && dimensions.get(0).getName().equals(ColumnHolder.TIME_COLUMN_NAME)) {
// Skip __time if it's in the first position, for compatibility with legacy dimensionSpecs.
dimensions.remove(0);
dimensionsSpecBuilder.setForceSegmentSortByTime(null);
} else {
// Store explicit forceSegmentSortByTime only if false, for compatibility with legacy dimensionSpecs.
dimensionsSpecBuilder.setForceSegmentSortByTime(forceSegmentSortByTime ? null : false);
}
return Pair.of(dimensionsSpecBuilder.setDimensions(dimensions).build(), aggregators);
}
/**
* If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column.
* else treat this column as a dimension.
*
* @param dimensions list is poulated if the output col is deemed to be a dimension
* @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column.
* @param outputColumnAggregatorFactories output col -> AggregatorFactory map
* @param outputColumn column name
* @param type columnType
*/
private static void populateDimensionsAndAggregators(
List<DimensionSchema> dimensions,
List<AggregatorFactory> aggregators,
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type,
QueryContext context,
Map<String, DimensionSchema> dimensionSchemas
)
{
if (ColumnHolder.TIME_COLUMN_NAME.equals(outputColumn)) {
if (!type.is(ValueType.LONG)) {
throw DruidException.defensive("Incorrect type[%s] for column[%s]", type, outputColumn);
}
dimensions.add(new LongDimensionSchema(outputColumn));
} else if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
getDimensionSchema(outputColumn, type, context, dimensionSchemas)
);
}
}
private SegmentGenerationUtils()
{
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.destination;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
/**
* Determines the final stage of a {@link DataSourceMSQDestination}.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = SegmentGenerationStageSpec.TYPE, value = SegmentGenerationStageSpec.class)
})
public interface TerminalStageSpec
{
StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper);
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
@ -92,6 +93,7 @@ public class MSQTaskQueryMaker implements QueryMaker
private final PlannerContext plannerContext;
private final ObjectMapper jsonMapper;
private final List<Entry<Integer, String>> fieldMapping;
private final MSQTerminalStageSpecFactory terminalStageSpecFactory;
MSQTaskQueryMaker(
@ -99,7 +101,8 @@ public class MSQTaskQueryMaker implements QueryMaker
final OverlordClient overlordClient,
final PlannerContext plannerContext,
final ObjectMapper jsonMapper,
final List<Entry<Integer, String>> fieldMapping
final List<Entry<Integer, String>> fieldMapping,
final MSQTerminalStageSpecFactory terminalStageSpecFactory
)
{
this.targetDataSource = targetDataSource;
@ -107,6 +110,7 @@ public class MSQTaskQueryMaker implements QueryMaker
this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext");
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.fieldMapping = Preconditions.checkNotNull(fieldMapping, "fieldMapping");
this.terminalStageSpecFactory = terminalStageSpecFactory;
}
@Override
@ -247,7 +251,11 @@ public class MSQTaskQueryMaker implements QueryMaker
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks,
null
null,
terminalStageSpecFactory.createTerminalStageSpec(
druidQuery,
plannerContext
)
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext, dataSourceDestination.isReplaceTimeChunks());
destination = dataSourceDestination;

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.sql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
@ -42,6 +43,7 @@ import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
@ -85,15 +87,24 @@ public class MSQTaskSqlEngine implements SqlEngine
private final OverlordClient overlordClient;
private final ObjectMapper jsonMapper;
private final MSQTerminalStageSpecFactory terminalStageSpecFactory;
@Inject
public MSQTaskSqlEngine(final OverlordClient overlordClient, final ObjectMapper jsonMapper)
{
this(overlordClient, jsonMapper, new MSQTerminalStageSpecFactory());
}
@VisibleForTesting
public MSQTaskSqlEngine(
final OverlordClient overlordClient,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final MSQTerminalStageSpecFactory terminalStageSpecFactory
)
{
this.overlordClient = overlordClient;
this.jsonMapper = jsonMapper;
this.terminalStageSpecFactory = terminalStageSpecFactory;
}
@Override
@ -162,7 +173,8 @@ public class MSQTaskSqlEngine implements SqlEngine
overlordClient,
plannerContext,
jsonMapper,
relRoot.fields
relRoot.fields,
terminalStageSpecFactory
);
}
@ -195,7 +207,8 @@ public class MSQTaskSqlEngine implements SqlEngine
overlordClient,
plannerContext,
jsonMapper,
relRoot.fields
relRoot.fields,
terminalStageSpecFactory
);
}

View File

@ -39,7 +39,7 @@ import java.util.Objects;
* Hack that allows "passing through" arbitrary complex types into
* {@link org.apache.druid.segment.incremental.IncrementalIndex}.
*
* Used by {@link org.apache.druid.msq.exec.ControllerImpl#makeDimensionsAndAggregatorsForIngestion}.
* Used by {@link org.apache.druid.msq.indexing.destination.SegmentGenerationUtils#makeDimensionsAndAggregatorsForIngestion}.
*
* To move away from this, it would need to be possible to create complex columns in segments only knowing the complex
* type; in particular, without knowing the type of an aggregator factory or dimension schema that corresponds to

View File

@ -225,7 +225,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("v1", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
@ -318,7 +318,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("agent_category", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(runtimeContext)

View File

@ -1375,7 +1375,6 @@ public class MSQReplaceTest extends MSQTestBase
+ "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' "
+ "PARTITIONED BY MONTH")
.setExpectedDataSource("foo")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2002-01-01T")))
@ -1432,7 +1431,6 @@ public class MSQReplaceTest extends MSQTestBase
+ "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' "
+ "PARTITIONED BY MONTH")
.setExpectedDataSource("foo")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Collections.singletonList(Intervals.ETERNITY))
@ -1480,7 +1478,6 @@ public class MSQReplaceTest extends MSQTestBase
"REPLACE INTO foo1 OVERWRITE ALL "
+ "select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1")
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(

View File

@ -290,7 +290,8 @@ public class MSQCompactionRunnerTest
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
Collections.singletonList(COMPACTION_INTERVAL),
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())),
null
),
actualMSQSpec.getDestination()
);
@ -360,7 +361,8 @@ public class MSQCompactionRunnerTest
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
Collections.singletonList(COMPACTION_INTERVAL),
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())),
null
),
actualMSQSpec.getDestination()
);

View File

@ -59,6 +59,7 @@ public class MSQControllerTaskTest
Granularities.DAY,
null,
INTERVALS,
null,
null
))
.query(new Druids.ScanQueryBuilder()

View File

@ -20,10 +20,14 @@
package org.apache.druid.msq.indexing.destination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
@ -58,4 +62,17 @@ public class DataSourceMSQDestinationTest
.usingGetClass()
.verify();
}
@Test
public void testBackwardCompatibility() throws JsonProcessingException
{
DataSourceMSQDestination destination = new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null);
Assert.assertEquals(SegmentGenerationStageSpec.instance(), destination.getTerminalStageSpec());
DataSourceMSQDestination dataSourceMSQDestination = new DefaultObjectMapper().readValue(
"{\"type\":\"dataSource\",\"dataSource\":\"datasource1\",\"segmentGranularity\":\"DAY\",\"rowsInTaskReport\":0,\"destinationResource\":{\"empty\":false,\"present\":true}}",
DataSourceMSQDestination.class
);
Assert.assertEquals(SegmentGenerationStageSpec.instance(), dataSourceMSQDestination.getTerminalStageSpec());
}
}

View File

@ -200,6 +200,7 @@ public class SqlStatementResourceTest extends MSQTestBase
Granularities.DAY,
null,
null,
null,
null
))
.tuningConfig(

View File

@ -91,6 +91,7 @@ import org.apache.druid.msq.guice.MSQDurableStorageModule;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.guice.MSQSqlModule;
import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.MSQControllerTask;
@ -210,7 +211,6 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -550,7 +550,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
final SqlEngine engine = new MSQTaskSqlEngine(
indexingServiceClient,
qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules())
qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()),
new MSQTerminalStageSpecFactory()
);
PlannerFactory plannerFactory = new PlannerFactory(

View File

@ -40,7 +40,7 @@ public class MSQTestDelegateDataSegmentPusher implements DataSegmentPusher
MSQTestSegmentManager segmentManager
)
{
delegate = dataSegmentPusher;
this.delegate = dataSegmentPusher;
this.segmentManager = segmentManager;
}

View File

@ -376,6 +376,7 @@ public class SqlStatementResourceHelperTest
Granularities.DAY,
null,
null,
null,
null
)
);