Add support for a custom `DimensionSchema` in `DataSourceMSQDestination` (#16864)

This PR adds support for passing in a custom DimensionSchema map to MSQ query destination of type DataSourceMSQDestination
This commit is contained in:
Vishesh Garg 2024-08-16 15:24:49 +05:30 committed by GitHub
parent 5b94839d9d
commit e37fe93f09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 167 additions and 38 deletions

View File

@ -1950,7 +1950,8 @@ public class ControllerImpl implements Controller
destination.getSegmentSortOrder(),
columnMappings,
isRollupQuery,
querySpec.getQuery()
querySpec.getQuery(),
destination.getDimensionToSchemaMap()
);
return new DataSchema(
@ -2122,13 +2123,34 @@ public class ControllerImpl implements Controller
return new StringTuple(array);
}
private static DimensionSchema getDimensionSchema(
final String outputColumnName,
@Nullable final ColumnType queryType,
QueryContext context,
@Nullable Map<String, DimensionSchema> dimensionToSchemaMap
)
{
if (dimensionToSchemaMap != null && dimensionToSchemaMap.containsKey(outputColumnName)) {
return dimensionToSchemaMap.get(outputColumnName);
}
// In case of ingestion, or when metrics are converted to dimensions when compaction is performed without rollup,
// we won't have an entry in the map. For those cases, use the default config.
return DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
queryType,
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.getArrayIngestMode(context)
);
}
private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensionsAndAggregatorsForIngestion(
final RowSignature querySignature,
final ClusterBy queryClusterBy,
final List<String> segmentSortOrder,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
final Query<?> query
final Query<?> query,
@Nullable final Map<String, DimensionSchema> dimensionToSchemaMap
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
@ -2214,18 +2236,14 @@ public class ControllerImpl implements Controller
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context()
query.context(),
dimensionToSchemaMap
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
type,
MultiStageQueryContext.useAutoColumnSchemas(query.context()),
MultiStageQueryContext.getArrayIngestMode(query.context())
)
getDimensionSchema(outputColumnName, type, query.context(), dimensionToSchemaMap)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
@ -2236,7 +2254,8 @@ public class ControllerImpl implements Controller
outputColumnAggregatorFactories,
outputColumnName,
type,
query.context()
query.context(),
dimensionToSchemaMap
);
}
}
@ -2263,19 +2282,15 @@ public class ControllerImpl implements Controller
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type,
QueryContext context
QueryContext context,
Map<String, DimensionSchema> dimensionToSchemaMap
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumn,
type,
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.getArrayIngestMode(context)
)
getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap)
);
}
}

View File

@ -81,6 +81,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MSQCompactionRunner implements CompactionRunner
@ -237,7 +238,11 @@ public class MSQCompactionRunner implements CompactionRunner
dataSchema.getDataSource(),
dataSchema.getGranularitySpec().getSegmentGranularity(),
null,
ImmutableList.of(replaceInterval)
ImmutableList.of(replaceInterval),
dataSchema.getDimensionsSpec()
.getDimensions()
.stream()
.collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
);
}
@ -494,9 +499,10 @@ public class MSQCompactionRunner implements CompactionRunner
// Used for writing the data schema during segment generation phase.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false);
// Add appropriate finalization to native query context i.e. for the GroupBy query
context.put(QueryContexts.FINALIZE_KEY, false);
context.putIfAbsent(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
return context;
}

View File

@ -28,7 +28,6 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
@ -43,7 +42,7 @@ public class MSQSpec
@JsonCreator
public MSQSpec(
@JsonProperty("query") Query<?> query,
@JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings,
@JsonProperty("columnMappings") ColumnMappings columnMappings,
@JsonProperty("destination") MSQDestination destination,
@JsonProperty("assignmentStrategy") WorkerAssignmentStrategy assignmentStrategy,
@JsonProperty("tuningConfig") MSQTuningConfig tuningConfig

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -35,6 +36,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -49,18 +51,23 @@ public class DataSourceMSQDestination implements MSQDestination
@Nullable
private final List<Interval> replaceTimeChunks;
@Nullable
private final Map<String, DimensionSchema> dimensionToSchemaMap;
@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String> segmentSortOrder,
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks
@JsonProperty("replaceTimeChunks") @Nullable List<Interval> replaceTimeChunks,
@JsonProperty("dimensionToSchemaMap") @Nullable Map<String, DimensionSchema> dimensionToSchemaMap
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity, "segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder : Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
this.dimensionToSchemaMap = dimensionToSchemaMap;
if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
@ -125,6 +132,17 @@ public class DataSourceMSQDestination implements MSQDestination
return replaceTimeChunks;
}
/**
* Returns the map of dimension name to its schema.
*/
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, DimensionSchema> getDimensionToSchemaMap()
{
return dimensionToSchemaMap;
}
/**
* Whether this object is in replace-existing-time-chunks mode.
*/
@ -158,13 +176,14 @@ public class DataSourceMSQDestination implements MSQDestination
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
&& Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks);
return Objects.hash(dataSource, segmentGranularity, segmentSortOrder, replaceTimeChunks, dimensionToSchemaMap);
}
@Override
@ -175,6 +194,7 @@ public class DataSourceMSQDestination implements MSQDestination
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
", dimensionToSchemaMap=" + dimensionToSchemaMap +
'}';
}

View File

@ -246,7 +246,8 @@ public class MSQTaskQueryMaker implements QueryMaker
targetDataSource.getDestinationName(),
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
replaceTimeChunks,
null
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());

View File

@ -225,7 +225,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("v1", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
@ -318,7 +318,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("agent_category", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(runtimeContext)

View File

@ -74,6 +74,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MSQCompactionRunnerTest
@ -87,13 +88,9 @@ public class MSQCompactionRunnerTest
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
private static List<String> PARTITION_DIMENSIONS;
private static final StringDimensionSchema DIM1 = new StringDimensionSchema(
"string_dim",
null,
null
);
private static final LongDimensionSchema LONG_DIMENSION_SCHEMA = new LongDimensionSchema("long_dim");
private static final List<DimensionSchema> DIMENSIONS = ImmutableList.of(DIM1, LONG_DIMENSION_SCHEMA);
private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, null);
private static final LongDimensionSchema LONG_DIMENSION = new LongDimensionSchema("long_dim");
private static final List<DimensionSchema> DIMENSIONS = ImmutableList.of(STRING_DIMENSION, LONG_DIMENSION);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added");
@ -291,7 +288,8 @@ public class MSQCompactionRunnerTest
DATA_SOURCE,
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
Collections.singletonList(COMPACTION_INTERVAL)
Collections.singletonList(COMPACTION_INTERVAL),
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
),
actualMSQSpec.getDestination()
);
@ -360,7 +358,8 @@ public class MSQCompactionRunnerTest
DATA_SOURCE,
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
Collections.singletonList(COMPACTION_INTERVAL)
Collections.singletonList(COMPACTION_INTERVAL),
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity()))
),
actualMSQSpec.getDestination()
);

View File

@ -58,7 +58,8 @@ public class MSQControllerTaskTest
"target",
Granularities.DAY,
null,
INTERVALS
INTERVALS,
null
))
.query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)

View File

@ -20,9 +20,14 @@
package org.apache.druid.msq.indexing.destination;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.junit.Test;
import java.util.Map;
public class DataSourceMSQDestinationTest
{
@ -30,7 +35,26 @@ public class DataSourceMSQDestinationTest
public void testEquals()
{
EqualsVerifier.forClass(DataSourceMSQDestination.class)
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder")
.withNonnullFields("dataSource", "segmentGranularity", "segmentSortOrder", "dimensionToSchemaMap")
.withPrefabValues(
Map.class,
ImmutableMap.of(
"language",
new StringDimensionSchema(
"language",
DimensionSchema.MultiValueHandling.SORTED_ARRAY,
false
)
),
ImmutableMap.of(
"region",
new StringDimensionSchema(
"region",
DimensionSchema.MultiValueHandling.SORTED_ARRAY,
false
)
)
)
.usingGetClass()
.verify();
}

View File

@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase
"test",
Granularities.DAY,
null,
null,
null
))
.tuningConfig(

View File

@ -375,6 +375,7 @@ public class SqlStatementResourceHelperTest
"test",
Granularities.DAY,
null,
null,
null
)
);

View File

@ -25,7 +25,9 @@ import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@ -52,6 +54,8 @@ import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggrega
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -515,6 +519,42 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test(dataProvider = "engine")
public void testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true");
Granularity newSegmentGranularity = Granularities.YEAR;
List<DimensionSchema> dimensionSchemas = ImmutableList.of(
new StringDimensionSchema("language", DimensionSchema.MultiValueHandling.SORTED_ARRAY, false),
new AutoTypeColumnSchema("deleted", ColumnType.DOUBLE)
);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newSegmentGranularity, null, true),
new UserCompactionTaskDimensionsConfig(dimensionSchemas),
null,
new AggregatorFactory[] {new LongSumAggregatorFactory("added", "added")},
true,
engine
);
//...compacted into 1 segment for the entire year.
forceTriggerAutoCompaction(1);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
verifySegmentsCompactedDimensionSchema(dimensionSchemas);
}
}
@Test
public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
{
@ -1941,6 +1981,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(), partitionsSpec);
}
}
private void verifySegmentsCompactedDimensionSchema(List<DimensionSchema> dimensionSchemas)
{
List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
List<DataSegment> foundCompactedSegments = new ArrayList<>();
for (DataSegment segment : segments) {
if (segment.getLastCompactionState() != null) {
foundCompactedSegments.add(segment);
}
}
for (DataSegment compactedSegment : foundCompactedSegments) {
MatcherAssert.assertThat(
dimensionSchemas,
Matchers.containsInAnyOrder(
compactedSegment.getLastCompactionState()
.getDimensionsSpec()
.getDimensions()
.toArray(new DimensionSchema[0]))
);
}
}
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception