From 3b88b57d7097d6fe8771e25a90bfaedccddb3c42 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 27 Aug 2024 11:35:48 +0530 Subject: [PATCH] Add configurable final stages to MSQ ingestion queries (#16699) * Add a segmentMorphFactory to MSQ. * Add test * Make argument nullable * Fix Guice issues * Merge with master * Remove extra information * Fix tests * Create a utils class * Refactor segment generation * Fix javadoc * Refactor * Refactor * Fix injection --- .../apache/druid/msq/exec/ControllerImpl.java | 391 +----------------- .../apache/druid/msq/guice/MSQSqlModule.java | 2 + .../guice/MSQTerminalStageSpecFactory.java | 37 ++ .../msq/indexing/MSQCompactionRunner.java | 3 +- .../destination/DataSourceMSQDestination.java | 23 +- .../SegmentGenerationStageSpec.java | 115 ++++++ .../destination/SegmentGenerationUtils.java | 374 +++++++++++++++++ .../destination/TerminalStageSpec.java | 39 ++ .../druid/msq/sql/MSQTaskQueryMaker.java | 12 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 19 +- .../util/PassthroughAggregatorFactory.java | 2 +- .../msq/exec/MSQParseExceptionsTest.java | 4 +- .../apache/druid/msq/exec/MSQReplaceTest.java | 3 - .../msq/indexing/MSQCompactionRunnerTest.java | 6 +- .../msq/indexing/MSQControllerTaskTest.java | 1 + .../DataSourceMSQDestinationTest.java | 17 + .../resources/SqlStatementResourceTest.java | 1 + .../apache/druid/msq/test/MSQTestBase.java | 5 +- .../MSQTestDelegateDataSegmentPusher.java | 2 +- .../util/SqlStatementResourceHelperTest.java | 1 + 20 files changed, 668 insertions(+), 389 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 875212605b8..46a24611351 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -29,7 +29,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntArraySet; @@ -38,10 +37,7 @@ import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.BrokerClient; import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; @@ -82,7 +78,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -102,6 +97,8 @@ import org.apache.druid.msq.indexing.WorkerCount; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.ExportMSQDestination; +import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec; +import org.apache.druid.msq.indexing.destination.TerminalStageSpec; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.FaultsExceededChecker; @@ -139,8 +136,6 @@ import org.apache.druid.msq.input.inline.InlineInputSpecSlicer; import org.apache.druid.msq.input.lookup.LookupInputSpec; import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer; import org.apache.druid.msq.input.stage.InputChannels; -import org.apache.druid.msq.input.stage.ReadablePartition; -import org.apache.druid.msq.input.stage.StageInputSlice; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpec; @@ -166,28 +161,21 @@ import org.apache.druid.msq.querykit.scan.ScanQueryKit; import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory; import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; -import org.apache.druid.msq.util.ArrayIngestMode; -import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.msq.util.IntervalUtils; import org.apache.druid.msq.util.MSQFutureUtils; import org.apache.druid.msq.util.MSQTaskQueryMakerUtils; import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.msq.util.PassthroughAggregatorFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -195,7 +183,6 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.timeline.CompactionState; @@ -205,7 +192,6 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CloseableUtils; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -218,7 +204,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1192,11 +1177,16 @@ public class ControllerImpl implements Controller { if (MSQControllerTask.isIngestion(querySpec) && stageNumber == queryDef.getFinalStageDefinition().getStageNumber()) { - // noinspection unchecked,rawtypes - return (Int2ObjectMap) makeSegmentGeneratorWorkerFactoryInfos(workerInputs, segmentsToGenerate); - } else { - return null; + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + TerminalStageSpec terminalStageSpec = destination.getTerminalStageSpec(); + if (destination.getTerminalStageSpec() instanceof SegmentGenerationStageSpec) { + return (Int2ObjectMap) ((SegmentGenerationStageSpec) terminalStageSpec).getWorkerInfo( + workerInputs, + segmentsToGenerate + ); + } } + return null; } @SuppressWarnings("rawtypes") @@ -1212,35 +1202,6 @@ public class ControllerImpl implements Controller return new MultiQueryKit(kitMap); } - private Int2ObjectMap> makeSegmentGeneratorWorkerFactoryInfos( - final WorkerInputs workerInputs, - final List segmentsToGenerate - ) - { - final Int2ObjectMap> retVal = new Int2ObjectAVLTreeMap<>(); - - // Empty segments validation already happens when the stages are started -- so we cannot have both - // isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here. - if (segmentsToGenerate.isEmpty()) { - return retVal; - } - - for (final int workerNumber : workerInputs.workers()) { - // SegmentGenerator stage has a single input from another stage. - final StageInputSlice stageInputSlice = - (StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber)); - - final List workerSegments = new ArrayList<>(); - retVal.put(workerNumber, workerSegments); - - for (final ReadablePartition partition : stageInputSlice.getPartitions()) { - workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber())); - } - } - - return retVal; - } - /** * A blocking function used to contact multiple workers. Checks if all the workers are running before contacting them. * @@ -1816,9 +1777,6 @@ public class ControllerImpl implements Controller } if (MSQControllerTask.isIngestion(querySpec)) { - final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature(); - final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); - // Find the stage that provides shuffled input to the final segment-generation stage. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); @@ -1845,24 +1803,15 @@ public class ControllerImpl implements Controller } } - // Then, add a segment-generation stage. - final DataSchema dataSchema = - makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); - - builder.add( - StageDefinition.builder(queryDef.getNextStageNumber()) - .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) - .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .processorFactory( - new SegmentGeneratorFrameProcessorFactory( - dataSchema, - columnMappings, - tuningConfig - ) - ) - ); - - return builder.build(); + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + return builder.add( + destination.getTerminalStageSpec() + .constructFinalStage( + queryDef, + querySpec, + jsonMapper) + ) + .build(); } else if (MSQControllerTask.writeFinalResultsToTaskReport(querySpec)) { return queryDef; } else if (MSQControllerTask.writeFinalStageResultsToDurableStorage(querySpec)) { @@ -1933,112 +1882,6 @@ public class ControllerImpl implements Controller return ((DataSourceMSQDestination) querySpec.getDestination()).getDataSource(); } - private static DataSchema makeDataSchemaForIngestion( - MSQSpec querySpec, - RowSignature querySignature, - ClusterBy queryClusterBy, - ColumnMappings columnMappings, - ObjectMapper jsonMapper - ) - { - final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - final boolean isRollupQuery = isRollupQuery(querySpec.getQuery()); - final boolean forceSegmentSortByTime = - MultiStageQueryContext.isForceSegmentSortByTime(querySpec.getQuery().context()); - - final Pair> dimensionsAndAggregators = - makeDimensionsAndAggregatorsForIngestion( - querySignature, - queryClusterBy, - destination.getSegmentSortOrder(), - forceSegmentSortByTime, - columnMappings, - isRollupQuery, - querySpec.getQuery(), - destination.getDimensionSchemas() - ); - - return new DataSchema( - destination.getDataSource(), - new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), - dimensionsAndAggregators.lhs, - dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]), - makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper), - new TransformSpec(null, Collections.emptyList()) - ); - } - - private static GranularitySpec makeGranularitySpecForIngestion( - final Query query, - final ColumnMappings columnMappings, - final boolean isRollupQuery, - final ObjectMapper jsonMapper - ) - { - if (isRollupQuery) { - final String queryGranularityString = - query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); - - if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) { - final Granularity queryGranularity; - - try { - queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY); - } - return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY); - } else { - return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY); - } - } - - /** - * Checks that a {@link GroupByQuery} is grouping on the primary time column. - *

- * The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the - * output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the - * presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter - * {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions - * is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field - * from {@link ColumnMappings}, are the same. - */ - private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings) - { - final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME); - - if (positions.size() == 1) { - final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0)); - return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD)); - } else { - return false; - } - } - - /** - * Whether a native query represents an ingestion with rollup. - *

- * Checks for three things: - *

- * - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and - * aggregations. - * - The query must not finalize aggregations, because rollup requires inserting the intermediate type of - * complex aggregations, not the finalized type. (So further rollup is possible.) - * - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because - * groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time - * (rollup expects multi-value dimensions to be treated as arrays). - */ - private static boolean isRollupQuery(Query query) - { - return query instanceof GroupByQuery - && !MultiStageQueryContext.isFinalizeAggregations(query.context()) - && !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true); - } - /** * Compute shard columns for {@link DimensionRangeShardSpec}. Returns an empty list if range-based sharding * is not applicable. @@ -2147,197 +1990,6 @@ public class ControllerImpl implements Controller return new StringTuple(array); } - private static DimensionSchema getDimensionSchema( - final String outputColumnName, - @Nullable final ColumnType queryType, - QueryContext context, - @Nullable Map dimensionSchemas - ) - { - if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) { - return dimensionSchemas.get(outputColumnName); - } - // In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollu - - // we won't have an entry in the map. For those cases, use the default config. - return DimensionSchemaUtils.createDimensionSchema( - outputColumnName, - queryType, - MultiStageQueryContext.useAutoColumnSchemas(context), - MultiStageQueryContext.getArrayIngestMode(context) - ); - } - - private static Pair> makeDimensionsAndAggregatorsForIngestion( - final RowSignature querySignature, - final ClusterBy queryClusterBy, - final List contextSegmentSortOrder, - final boolean forceSegmentSortByTime, - final ColumnMappings columnMappings, - final boolean isRollupQuery, - final Query query, - @Nullable final Map dimensionSchemas - ) - { - // Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to - // deprecation and removal in future - if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) { - log.warn( - "%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as " - + "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be " - + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " - + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " - + "out multi-value string dimensions using ARRAY_TO_MV. " - + "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.", - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE - ); - } - - final List dimensions = new ArrayList<>(); - final List aggregators = new ArrayList<>(); - - // During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want - // this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in - // that order. - - // Start with segmentSortOrder. - final Set outputColumnsInOrder = new LinkedHashSet<>(contextSegmentSortOrder); - - // Then __time, if it's an output column and forceSegmentSortByTime is set. - if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME) && forceSegmentSortByTime) { - outputColumnsInOrder.add(ColumnHolder.TIME_COLUMN_NAME); - } - - // Then the query-level CLUSTERED BY. - // Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. - // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows - // within an individual segment. - for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { - final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); - for (final int outputColumn : outputColumns) { - outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); - } - } - - // Then all other columns. - outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames()); - - Map outputColumnAggregatorFactories = new HashMap<>(); - - if (isRollupQuery) { - // Populate aggregators from the native query when doing an ingest in rollup mode. - for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { - for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) { - final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); - if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { - throw new ISE("There can only be one aggregation for column [%s].", outputColumn); - } else { - outputColumnAggregatorFactories.put( - outputColumnName, - aggregatorFactory.withName(outputColumnName).getCombiningFactory() - ); - } - } - } - } - - // Each column can be either a dimension or an aggregator. - // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as - // a dimension. For complex columns, certains hacks are in place. - for (final String outputColumnName : outputColumnsInOrder) { - // CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require - // that output names be unique. - final int outputColumn = CollectionUtils.getOnlyElement( - columnMappings.getOutputColumnsByName(outputColumnName), - xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs) - ); - final String queryColumn = columnMappings.getQueryColumnName(outputColumn); - final ColumnType type = - querySignature.getColumnType(queryColumn) - .orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName)); - - if (!type.is(ValueType.COMPLEX)) { - // non complex columns - populateDimensionsAndAggregators( - dimensions, - aggregators, - outputColumnAggregatorFactories, - outputColumnName, - type, - query.context(), - dimensionSchemas - ); - } else { - // complex columns only - if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { - dimensions.add( - getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas) - ); - } else if (!isRollupQuery) { - aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); - } else { - populateDimensionsAndAggregators( - dimensions, - aggregators, - outputColumnAggregatorFactories, - outputColumnName, - type, - query.context(), - dimensionSchemas - ); - } - } - } - - final DimensionsSpec.Builder dimensionsSpecBuilder = DimensionsSpec.builder(); - - if (!dimensions.isEmpty() && dimensions.get(0).getName().equals(ColumnHolder.TIME_COLUMN_NAME)) { - // Skip __time if it's in the first position, for compatibility with legacy dimensionSpecs. - dimensions.remove(0); - dimensionsSpecBuilder.setForceSegmentSortByTime(null); - } else { - // Store explicit forceSegmentSortByTime only if false, for compatibility with legacy dimensionSpecs. - dimensionsSpecBuilder.setForceSegmentSortByTime(forceSegmentSortByTime ? null : false); - } - - return Pair.of(dimensionsSpecBuilder.setDimensions(dimensions).build(), aggregators); - } - - /** - * If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column. - * else treat this column as a dimension. - * - * @param dimensions list is poulated if the output col is deemed to be a dimension - * @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column. - * @param outputColumnAggregatorFactories output col -> AggregatorFactory map - * @param outputColumn column name - * @param type columnType - */ - private static void populateDimensionsAndAggregators( - List dimensions, - List aggregators, - Map outputColumnAggregatorFactories, - String outputColumn, - ColumnType type, - QueryContext context, - Map dimensionSchemas - ) - { - if (ColumnHolder.TIME_COLUMN_NAME.equals(outputColumn)) { - if (!type.is(ValueType.LONG)) { - throw DruidException.defensive("Incorrect type[%s] for column[%s]", type, outputColumn); - } - dimensions.add(new LongDimensionSchema(outputColumn)); - } else if (outputColumnAggregatorFactories.containsKey(outputColumn)) { - aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); - } else { - dimensions.add( - getDimensionSchema(outputColumn, type, context, dimensionSchemas) - ); - } - } - private static DateTime getBucketDateTime( final ClusterByPartition partitionBoundary, final Granularity segmentGranularity, @@ -2815,7 +2467,8 @@ public class ControllerImpl implements Controller for (final StageId stageId : newStageIds) { // Allocate segments, if this is the final stage of an ingestion. if (MSQControllerTask.isIngestion(querySpec) - && stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber()) { + && stageId.getStageNumber() == queryDef.getFinalStageDefinition().getStageNumber() + && (((DataSourceMSQDestination) querySpec.getDestination()).getTerminalStageSpec() instanceof SegmentGenerationStageSpec)) { populateSegmentsToGenerate(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java index ea6eb364cec..a523fd6c012 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java @@ -54,6 +54,8 @@ public class MSQSqlModule implements DruidModule // We want this module to bring InputSourceModule along for the ride. binder.install(new InputSourceModule()); + binder.bind(MSQTerminalStageSpecFactory.class).toInstance(new MSQTerminalStageSpecFactory()); + binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class); // Set up the EXTERN macro. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java new file mode 100644 index 00000000000..d347322a7fb --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQTerminalStageSpecFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.guice; + +import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec; +import org.apache.druid.msq.indexing.destination.TerminalStageSpec; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidQuery; + +public class MSQTerminalStageSpecFactory +{ + /** + * Creates a {@link TerminalStageSpec} which determines the final of a query. Currently, always returns a segment + * generation spec, but this can be used to configure a wide range of behaviours. + */ + public TerminalStageSpec createTerminalStageSpec(DruidQuery druidQuery, PlannerContext plannerContext) + { + return SegmentGenerationStageSpec.instance(); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index fa011429763..5457e04286c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -243,7 +243,8 @@ public class MSQCompactionRunner implements CompactionRunner dataSchema.getDimensionsSpec() .getDimensions() .stream() - .collect(Collectors.toMap(DimensionSchema::getName, Function.identity())) + .collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), + null ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java index 6276c588d9d..485bceb2d09 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java @@ -51,6 +51,8 @@ public class DataSourceMSQDestination implements MSQDestination @Nullable private final List replaceTimeChunks; + private final TerminalStageSpec terminalStageSpec; + @Nullable private final Map dimensionSchemas; @@ -60,7 +62,8 @@ public class DataSourceMSQDestination implements MSQDestination @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("segmentSortOrder") @Nullable List segmentSortOrder, @JsonProperty("replaceTimeChunks") @Nullable List replaceTimeChunks, - @JsonProperty("dimensionSchemas") @Nullable Map dimensionSchemas + @JsonProperty("dimensionSchemas") @Nullable Map dimensionSchemas, + @JsonProperty("terminalStageSpec") @Nullable TerminalStageSpec terminalStageSpec ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); @@ -68,6 +71,7 @@ public class DataSourceMSQDestination implements MSQDestination this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList(); this.replaceTimeChunks = replaceTimeChunks; this.dimensionSchemas = dimensionSchemas; + this.terminalStageSpec = terminalStageSpec != null ? terminalStageSpec : SegmentGenerationStageSpec.instance(); if (replaceTimeChunks != null) { // Verify that if replaceTimeChunks is provided, it is nonempty. @@ -105,6 +109,17 @@ public class DataSourceMSQDestination implements MSQDestination return dataSource; } + /** + * Returns the terminal stage spec. + *

+ * The terminal stage spec, is a way to tell the MSQ task how to convert the results into segments at the final stage. + */ + @JsonProperty + public TerminalStageSpec getTerminalStageSpec() + { + return terminalStageSpec; + } + @JsonProperty public Granularity getSegmentGranularity() { @@ -177,13 +192,14 @@ public class DataSourceMSQDestination implements MSQDestination && Objects.equals(segmentGranularity, that.segmentGranularity) && Objects.equals(segmentSortOrder, that.segmentSortOrder) && Objects.equals(replaceTimeChunks, that.replaceTimeChunks) - && Objects.equals(dimensionSchemas, that.dimensionSchemas); + && Objects.equals(dimensionSchemas, that.dimensionSchemas) + && Objects.equals(terminalStageSpec, that.terminalStageSpec); } @Override public int hashCode() { - return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas); + return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionSchemas, terminalStageSpec); } @Override @@ -195,6 +211,7 @@ public class DataSourceMSQDestination implements MSQDestination ", segmentSortOrder=" + segmentSortOrder + ", replaceTimeChunks=" + replaceTimeChunks + ", dimensionSchemas=" + dimensionSchemas + + ", terminalStageSpec=" + terminalStageSpec + '}'; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java new file mode 100644 index 00000000000..131d926288b --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; +import org.apache.druid.msq.kernel.controller.WorkerInputs; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.sql.calcite.planner.ColumnMappings; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class SegmentGenerationStageSpec implements TerminalStageSpec +{ + public static final String TYPE = "segmentGeneration"; + + private static final SegmentGenerationStageSpec INSTANCE = new SegmentGenerationStageSpec(); + + private SegmentGenerationStageSpec() + { + } + + @JsonCreator + public static SegmentGenerationStageSpec instance() + { + return INSTANCE; + } + + @Override + public StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper) + { + final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); + final ColumnMappings columnMappings = querySpec.getColumnMappings(); + final RowSignature querySignature = queryDef.getFinalStageDefinition().getSignature(); + final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); + + // Add a segment-generation stage. + final DataSchema dataSchema = + SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); + + return StageDefinition.builder(queryDef.getNextStageNumber()) + .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) + .maxWorkerCount(tuningConfig.getMaxNumWorkers()) + .processorFactory( + new SegmentGeneratorFrameProcessorFactory( + dataSchema, + columnMappings, + tuningConfig + ) + ); + } + + public Int2ObjectMap> getWorkerInfo( + final WorkerInputs workerInputs, + @Nullable final List segmentsToGenerate + ) + { + final Int2ObjectMap> retVal = new Int2ObjectAVLTreeMap<>(); + + // Empty segments validation already happens when the stages are started -- so we cannot have both + // isFailOnEmptyInsertEnabled and segmentsToGenerate.isEmpty() be true here. + if (segmentsToGenerate == null || segmentsToGenerate.isEmpty()) { + return retVal; + } + + for (final int workerNumber : workerInputs.workers()) { + // SegmentGenerator stage has a single input from another stage. + final StageInputSlice stageInputSlice = + (StageInputSlice) Iterables.getOnlyElement(workerInputs.inputsForWorker(workerNumber)); + + final List workerSegments = new ArrayList<>(); + retVal.put(workerNumber, workerSegments); + + for (final ReadablePartition partition : stageInputSlice.getPartitions()) { + workerSegments.add(segmentsToGenerate.get(partition.getPartitionNumber())); + } + } + + return retVal; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java new file mode 100644 index 00000000000..09d79534337 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationUtils.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.util.ArrayIngestMode; +import org.apache.druid.msq.util.DimensionSchemaUtils; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.msq.util.PassthroughAggregatorFactory; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.rel.DruidQuery; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public final class SegmentGenerationUtils +{ + private static final Logger log = new Logger(SegmentGenerationUtils.class); + + public static DataSchema makeDataSchemaForIngestion( + MSQSpec querySpec, + RowSignature querySignature, + ClusterBy queryClusterBy, + ColumnMappings columnMappings, + ObjectMapper jsonMapper + ) + { + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + final boolean isRollupQuery = isRollupQuery(querySpec.getQuery()); + final boolean forceSegmentSortByTime = + MultiStageQueryContext.isForceSegmentSortByTime(querySpec.getQuery().context()); + + final Pair> dimensionsAndAggregators = + makeDimensionsAndAggregatorsForIngestion( + querySignature, + queryClusterBy, + destination.getSegmentSortOrder(), + forceSegmentSortByTime, + columnMappings, + isRollupQuery, + querySpec.getQuery(), + destination.getDimensionSchemas() + ); + + return new DataSchema( + destination.getDataSource(), + new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), + dimensionsAndAggregators.lhs, + dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]), + makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper), + new TransformSpec(null, Collections.emptyList()) + ); + } + + private static GranularitySpec makeGranularitySpecForIngestion( + final Query query, + final ColumnMappings columnMappings, + final boolean isRollupQuery, + final ObjectMapper jsonMapper + ) + { + if (isRollupQuery) { + final String queryGranularityString = + query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); + + if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) { + final Granularity queryGranularity; + + try { + queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY); + } + return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY); + } else { + return new ArbitraryGranularitySpec(Granularities.NONE, false, Intervals.ONLY_ETERNITY); + } + } + + /** + * Checks that a {@link GroupByQuery} is grouping on the primary time column. + *

+ * The logic here is roundabout. First, we check which column in the {@link GroupByQuery} corresponds to the + * output column {@link ColumnHolder#TIME_COLUMN_NAME}, using our {@link ColumnMappings}. Then, we check for the + * presence of an optimization done in {@link DruidQuery#toGroupByQuery()}, where the context parameter + * {@link GroupByQuery#CTX_TIMESTAMP_RESULT_FIELD} and various related parameters are set when one of the dimensions + * is detected to be a time-floor. Finally, we check that the name of that dimension, and the name of our time field + * from {@link ColumnMappings}, are the same. + */ + private static boolean timeIsGroupByDimension(GroupByQuery groupByQuery, ColumnMappings columnMappings) + { + final IntList positions = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME); + + if (positions.size() == 1) { + final String queryTimeColumn = columnMappings.getQueryColumnName(positions.getInt(0)); + return queryTimeColumn.equals(groupByQuery.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD)); + } else { + return false; + } + } + + /** + * Whether a native query represents an ingestion with rollup. + *

+ * Checks for three things: + *

+ * - The query must be a {@link GroupByQuery}, because rollup requires columns to be split into dimensions and + * aggregations. + * - The query must not finalize aggregations, because rollup requires inserting the intermediate type of + * complex aggregations, not the finalized type. (So further rollup is possible.) + * - The query must explicitly disable {@link GroupByQueryConfig#CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING}, because + * groupBy on multi-value dimensions implicitly unnests, which is not desired behavior for rollup at ingestion time + * (rollup expects multi-value dimensions to be treated as arrays). + */ + private static boolean isRollupQuery(Query query) + { + return query instanceof GroupByQuery + && !MultiStageQueryContext.isFinalizeAggregations(query.context()) + && !query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, true); + } + + private static DimensionSchema getDimensionSchema( + final String outputColumnName, + @Nullable final ColumnType queryType, + QueryContext context, + @Nullable Map dimensionSchemas + ) + { + if (dimensionSchemas != null && dimensionSchemas.containsKey(outputColumnName)) { + return dimensionSchemas.get(outputColumnName); + } + // In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollu + + // we won't have an entry in the map. For those cases, use the default config. + return DimensionSchemaUtils.createDimensionSchema( + outputColumnName, + queryType, + MultiStageQueryContext.useAutoColumnSchemas(context), + MultiStageQueryContext.getArrayIngestMode(context) + ); + } + + private static Pair> makeDimensionsAndAggregatorsForIngestion( + final RowSignature querySignature, + final ClusterBy queryClusterBy, + final List contextSegmentSortOrder, + final boolean forceSegmentSortByTime, + final ColumnMappings columnMappings, + final boolean isRollupQuery, + final Query query, + @Nullable final Map dimensionSchemas + ) + { + // Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to + // deprecation and removal in future + if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) { + log.warn( + "%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as " + + "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be " + + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " + + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " + + "out multi-value string dimensions using ARRAY_TO_MV. " + + "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.", + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE + ); + } + + final List dimensions = new ArrayList<>(); + final List aggregators = new ArrayList<>(); + + // During ingestion, segment sort order is determined by the order of fields in the DimensionsSchema. We want + // this to match user intent as dictated by the declared segment sort order and CLUSTERED BY, so add things in + // that order. + + // Start with segmentSortOrder. + final Set outputColumnsInOrder = new LinkedHashSet<>(contextSegmentSortOrder); + + // Then __time, if it's an output column and forceSegmentSortByTime is set. + if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME) && forceSegmentSortByTime) { + outputColumnsInOrder.add(ColumnHolder.TIME_COLUMN_NAME); + } + + // Then the query-level CLUSTERED BY. + // Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. + // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows + // within an individual segment. + for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { + final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); + for (final int outputColumn : outputColumns) { + outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); + } + } + + // Then all other columns. + outputColumnsInOrder.addAll(columnMappings.getOutputColumnNames()); + + Map outputColumnAggregatorFactories = new HashMap<>(); + + if (isRollupQuery) { + // Populate aggregators from the native query when doing an ingest in rollup mode. + for (AggregatorFactory aggregatorFactory : ((GroupByQuery) query).getAggregatorSpecs()) { + for (final int outputColumn : columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) { + final String outputColumnName = columnMappings.getOutputColumnName(outputColumn); + if (outputColumnAggregatorFactories.containsKey(outputColumnName)) { + throw new ISE("There can only be one aggregation for column [%s].", outputColumn); + } else { + outputColumnAggregatorFactories.put( + outputColumnName, + aggregatorFactory.withName(outputColumnName).getCombiningFactory() + ); + } + } + } + } + + // Each column can be either a dimension or an aggregator. + // For non-complex columns, If the aggregator factory of the column is not available, we treat the column as + // a dimension. For complex columns, certains hacks are in place. + for (final String outputColumnName : outputColumnsInOrder) { + // CollectionUtils.getOnlyElement because this method is only called during ingestion, where we require + // that output names be unique. + final int outputColumn = CollectionUtils.getOnlyElement( + columnMappings.getOutputColumnsByName(outputColumnName), + xs -> new ISE("Expected single output column for name [%s], but got [%s]", outputColumnName, xs) + ); + final String queryColumn = columnMappings.getQueryColumnName(outputColumn); + final ColumnType type = + querySignature.getColumnType(queryColumn) + .orElseThrow(() -> new ISE("No type for column [%s]", outputColumnName)); + + if (!type.is(ValueType.COMPLEX)) { + // non complex columns + populateDimensionsAndAggregators( + dimensions, + aggregators, + outputColumnAggregatorFactories, + outputColumnName, + type, + query.context(), + dimensionSchemas + ); + } else { + // complex columns only + if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { + dimensions.add( + getDimensionSchema(outputColumnName, type, query.context(), dimensionSchemas) + ); + } else if (!isRollupQuery) { + aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); + } else { + populateDimensionsAndAggregators( + dimensions, + aggregators, + outputColumnAggregatorFactories, + outputColumnName, + type, + query.context(), + dimensionSchemas + ); + } + } + } + + final DimensionsSpec.Builder dimensionsSpecBuilder = DimensionsSpec.builder(); + + if (!dimensions.isEmpty() && dimensions.get(0).getName().equals(ColumnHolder.TIME_COLUMN_NAME)) { + // Skip __time if it's in the first position, for compatibility with legacy dimensionSpecs. + dimensions.remove(0); + dimensionsSpecBuilder.setForceSegmentSortByTime(null); + } else { + // Store explicit forceSegmentSortByTime only if false, for compatibility with legacy dimensionSpecs. + dimensionsSpecBuilder.setForceSegmentSortByTime(forceSegmentSortByTime ? null : false); + } + + return Pair.of(dimensionsSpecBuilder.setDimensions(dimensions).build(), aggregators); + } + + /** + * If the output column is present in the outputColumnAggregatorFactories that means we already have the aggregator information for this column. + * else treat this column as a dimension. + * + * @param dimensions list is poulated if the output col is deemed to be a dimension + * @param aggregators list is populated with the aggregator if the output col is deemed to be a aggregation column. + * @param outputColumnAggregatorFactories output col -> AggregatorFactory map + * @param outputColumn column name + * @param type columnType + */ + private static void populateDimensionsAndAggregators( + List dimensions, + List aggregators, + Map outputColumnAggregatorFactories, + String outputColumn, + ColumnType type, + QueryContext context, + Map dimensionSchemas + ) + { + if (ColumnHolder.TIME_COLUMN_NAME.equals(outputColumn)) { + if (!type.is(ValueType.LONG)) { + throw DruidException.defensive("Incorrect type[%s] for column[%s]", type, outputColumn); + } + dimensions.add(new LongDimensionSchema(outputColumn)); + } else if (outputColumnAggregatorFactories.containsKey(outputColumn)) { + aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); + } else { + dimensions.add( + getDimensionSchema(outputColumn, type, context, dimensionSchemas) + ); + } + } + + private SegmentGenerationUtils() + { + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java new file mode 100644 index 00000000000..6bae67954fd --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/TerminalStageSpec.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.destination; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.StageDefinitionBuilder; + +/** + * Determines the final stage of a {@link DataSourceMSQDestination}. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = SegmentGenerationStageSpec.TYPE, value = SegmentGenerationStageSpec.class) +}) +public interface TerminalStageSpec +{ + StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper); +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 60fa197c4a5..830fb87e1b2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.msq.exec.MSQTasks; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; @@ -92,6 +93,7 @@ public class MSQTaskQueryMaker implements QueryMaker private final PlannerContext plannerContext; private final ObjectMapper jsonMapper; private final List> fieldMapping; + private final MSQTerminalStageSpecFactory terminalStageSpecFactory; MSQTaskQueryMaker( @@ -99,7 +101,8 @@ public class MSQTaskQueryMaker implements QueryMaker final OverlordClient overlordClient, final PlannerContext plannerContext, final ObjectMapper jsonMapper, - final List> fieldMapping + final List> fieldMapping, + final MSQTerminalStageSpecFactory terminalStageSpecFactory ) { this.targetDataSource = targetDataSource; @@ -107,6 +110,7 @@ public class MSQTaskQueryMaker implements QueryMaker this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); this.fieldMapping = Preconditions.checkNotNull(fieldMapping, "fieldMapping"); + this.terminalStageSpecFactory = terminalStageSpecFactory; } @Override @@ -247,7 +251,11 @@ public class MSQTaskQueryMaker implements QueryMaker segmentGranularityObject, segmentSortOrder, replaceTimeChunks, - null + null, + terminalStageSpecFactory.createTerminalStageSpec( + druidQuery, + plannerContext + ) ); MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext, dataSourceDestination.isReplaceTimeChunks()); destination = dataSourceDestination; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index d20abffb0f9..24fbd373c95 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.sql; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; @@ -42,6 +43,7 @@ import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.util.ArrayIngestMode; import org.apache.druid.msq.util.DimensionSchemaUtils; @@ -85,15 +87,24 @@ public class MSQTaskSqlEngine implements SqlEngine private final OverlordClient overlordClient; private final ObjectMapper jsonMapper; + private final MSQTerminalStageSpecFactory terminalStageSpecFactory; @Inject + public MSQTaskSqlEngine(final OverlordClient overlordClient, final ObjectMapper jsonMapper) + { + this(overlordClient, jsonMapper, new MSQTerminalStageSpecFactory()); + } + + @VisibleForTesting public MSQTaskSqlEngine( final OverlordClient overlordClient, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final MSQTerminalStageSpecFactory terminalStageSpecFactory ) { this.overlordClient = overlordClient; this.jsonMapper = jsonMapper; + this.terminalStageSpecFactory = terminalStageSpecFactory; } @Override @@ -162,7 +173,8 @@ public class MSQTaskSqlEngine implements SqlEngine overlordClient, plannerContext, jsonMapper, - relRoot.fields + relRoot.fields, + terminalStageSpecFactory ); } @@ -195,7 +207,8 @@ public class MSQTaskSqlEngine implements SqlEngine overlordClient, plannerContext, jsonMapper, - relRoot.fields + relRoot.fields, + terminalStageSpecFactory ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java index 8de47936eb4..f821c169505 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/PassthroughAggregatorFactory.java @@ -39,7 +39,7 @@ import java.util.Objects; * Hack that allows "passing through" arbitrary complex types into * {@link org.apache.druid.segment.incremental.IncrementalIndex}. * - * Used by {@link org.apache.druid.msq.exec.ControllerImpl#makeDimensionsAndAggregatorsForIngestion}. + * Used by {@link org.apache.druid.msq.indexing.destination.SegmentGenerationUtils#makeDimensionsAndAggregatorsForIngestion}. * * To move away from this, it would need to be possible to create complex columns in segments only knowing the complex * type; in particular, without knowing the type of an aggregator factory or dimension schema that corresponds to diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java index bc8d517ffba..879da23977e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java @@ -225,7 +225,7 @@ public class MSQParseExceptionsTest extends MSQTestBase new ColumnMapping("v1", "agent_category") ) )) - .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null)) + .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null)) .tuningConfig(MSQTuningConfig.defaultConfig()) .build()) .setQueryContext(DEFAULT_MSQ_CONTEXT) @@ -318,7 +318,7 @@ public class MSQParseExceptionsTest extends MSQTestBase new ColumnMapping("agent_category", "agent_category") ) )) - .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null)) + .destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null)) .tuningConfig(MSQTuningConfig.defaultConfig()) .build()) .setQueryContext(runtimeContext) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index e470a951877..72541046913 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -1375,7 +1375,6 @@ public class MSQReplaceTest extends MSQTestBase + "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' " + "PARTITIONED BY MONTH") .setExpectedDataSource("foo") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedRowSignature(rowSignature) .setQueryContext(context) .setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2002-01-01T"))) @@ -1432,7 +1431,6 @@ public class MSQReplaceTest extends MSQTestBase + "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' " + "PARTITIONED BY MONTH") .setExpectedDataSource("foo") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedRowSignature(rowSignature) .setQueryContext(context) .setExpectedDestinationIntervals(Collections.singletonList(Intervals.ETERNITY)) @@ -1480,7 +1478,6 @@ public class MSQReplaceTest extends MSQTestBase "REPLACE INTO foo1 OVERWRITE ALL " + "select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1") .setExpectedDataSource("foo1") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedMSQSegmentReport( new MSQSegmentReport( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 94be3495b98..09c5ae47718 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -290,7 +290,8 @@ public class MSQCompactionRunnerTest SEGMENT_GRANULARITY.getDefaultGranularity(), null, Collections.singletonList(COMPACTION_INTERVAL), - DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())) + DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), + null ), actualMSQSpec.getDestination() ); @@ -360,7 +361,8 @@ public class MSQCompactionRunnerTest SEGMENT_GRANULARITY.getDefaultGranularity(), null, Collections.singletonList(COMPACTION_INTERVAL), - DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())) + DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), + null ), actualMSQSpec.getDestination() ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index e969e209387..76586c1e108 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -59,6 +59,7 @@ public class MSQControllerTaskTest Granularities.DAY, null, INTERVALS, + null, null )) .query(new Druids.ScanQueryBuilder() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java index 331430181a9..6b843df9187 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java @@ -20,10 +20,14 @@ package org.apache.druid.msq.indexing.destination; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.junit.Assert; import org.junit.Test; import java.util.Map; @@ -58,4 +62,17 @@ public class DataSourceMSQDestinationTest .usingGetClass() .verify(); } + + @Test + public void testBackwardCompatibility() throws JsonProcessingException + { + DataSourceMSQDestination destination = new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null, null); + Assert.assertEquals(SegmentGenerationStageSpec.instance(), destination.getTerminalStageSpec()); + + DataSourceMSQDestination dataSourceMSQDestination = new DefaultObjectMapper().readValue( + "{\"type\":\"dataSource\",\"dataSource\":\"datasource1\",\"segmentGranularity\":\"DAY\",\"rowsInTaskReport\":0,\"destinationResource\":{\"empty\":false,\"present\":true}}", + DataSourceMSQDestination.class + ); + Assert.assertEquals(SegmentGenerationStageSpec.instance(), dataSourceMSQDestination.getTerminalStageSpec()); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index 2a753e21d16..a79edee53b8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -200,6 +200,7 @@ public class SqlStatementResourceTest extends MSQTestBase Granularities.DAY, null, null, + null, null )) .tuningConfig( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 258b5c97751..fe725528f40 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -91,6 +91,7 @@ import org.apache.druid.msq.guice.MSQDurableStorageModule; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.guice.MSQSqlModule; +import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.MSQControllerTask; @@ -210,7 +211,6 @@ import org.mockito.Mockito; import javax.annotation.Nonnull; import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -550,7 +550,8 @@ public class MSQTestBase extends BaseCalciteQueryTest final SqlEngine engine = new MSQTaskSqlEngine( indexingServiceClient, - qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()) + qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()), + new MSQTerminalStageSpecFactory() ); PlannerFactory plannerFactory = new PlannerFactory( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java index 61a4389e93d..73fca53682c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java @@ -40,7 +40,7 @@ public class MSQTestDelegateDataSegmentPusher implements DataSegmentPusher MSQTestSegmentManager segmentManager ) { - delegate = dataSegmentPusher; + this.delegate = dataSegmentPusher; this.segmentManager = segmentManager; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 58856adf366..7fb6bc687a8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -376,6 +376,7 @@ public class SqlStatementResourceHelperTest Granularities.DAY, null, null, + null, null ) );