Enable compaction ITs on MSQ engine (#16778)

Follow-up to #16291, this commit enables a subset of existing native compaction ITs on the MSQ engine.

In the process, the following changes have been introduced in the MSQ compaction flow:
- Populate `metricsSpec` in `CompactionState` from `querySpec` in `MSQControllerTask` instead of `dataSchema`
- Add check for pre-rolled-up segments having `AggregatorFactory` with different input and output column names
- Fix passing missing cluster-by clause in scan queries
- Add annotation of `CompactionState` to tombstone segments
This commit is contained in:
Vishesh Garg 2024-07-30 09:34:46 +05:30 committed by GitHub
parent c7cde31a89
commit e9ea243d97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 571 additions and 232 deletions

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -1357,7 +1356,10 @@ public class ControllerImpl implements Controller
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
private void publishAllSegments(
final Set<DataSegment> segments,
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction
) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) querySpec.getDestination();
@ -1413,7 +1415,7 @@ public class ControllerImpl implements Controller
}
performSegmentPublish(
context.taskActionClient(),
createOverwriteAction(taskLockType, segmentsWithTombstones)
createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones))
);
}
} else if (!segments.isEmpty()) {
@ -1543,6 +1545,7 @@ public class ControllerImpl implements Controller
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity();
@SuppressWarnings("unchecked")
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
@ -1553,7 +1556,7 @@ public class ControllerImpl implements Controller
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
if (!segments.isEmpty() && storeCompactionState) {
if (storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
@ -1565,20 +1568,21 @@ public class ControllerImpl implements Controller
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec();
ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy();
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
compactionStateAnnotateFunction = addCompactionStateToSegments(
querySpec,
context.jsonMapper(),
dataSchema,
shardSpec,
clusterBy,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
publishAllSegments(segments, compactionStateAnnotateFunction);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
@ -1624,33 +1628,49 @@ public class ControllerImpl implements Controller
MSQSpec querySpec,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
@Nullable ShardSpec shardSpec,
@Nullable ClusterBy clusterBy,
String queryId
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
PartitionsSpec partitionSpec;
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
// shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an
// MSQControllerTask.
if (shardSpec != null) {
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
clusterBy.getColumns()
.stream()
.map(KeyColumn::columnName).collect(Collectors.toList()),
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}
Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination())
@ -1671,13 +1691,26 @@ public class ControllerImpl implements Controller
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);
List<Object> metricsSpec = Collections.emptyList();
if (querySpec.getQuery() instanceof GroupByQuery) {
// For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting
// in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the original
// AggregatorFactory definition for aggregators in the dataSchema, therefore, directly from the querySpec.
GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery();
// Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false)
// moves metrics columns to dimensions in the final schema.
Set<String> aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(
Collectors.toSet());
metricsSpec = new ArrayList<>(
groupByQuery.getAggregatorSpecs()
.stream()
.filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName()))
.collect(Collectors.toList())
);
}
IndexSpec indexSpec = tuningConfig.getIndexSpec();

View File

@ -49,7 +49,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
@ -58,11 +60,13 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
@ -123,7 +127,8 @@ public class MSQCompactionRunner implements CompactionRunner
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
@ -139,13 +144,57 @@ public class MSQCompactionRunner implements CompactionRunner
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(CompactionConfigValidationResult.success());
}
/**
* Valides that there are no rolled-up segments where either:
* <ul>
* <li>aggregator factory differs from its combining factory </li>
* <li>input col name is different from the output name (non-idempotent)</li>
* </ul>
*/
private CompactionConfigValidationResult validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
{
for (Map.Entry<Interval, DataSchema> intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
if (combinedDataSchema.hasRolledUpSegments()) {
for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
// This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
// compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
if (
!(
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
(
aggregatorFactory.requiredFields().isEmpty() ||
(aggregatorFactory.requiredFields().size() == 1 &&
aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName()))
)
)
) {
// MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
// the aggregated column name. This is because the aggregated values would then get overwritten by new
// values and the existing values would be lost. Note that if no rollup is specified in an index spec,
// the default value is true.
return CompactionConfigValidationResult.failure(
"MSQ: Rolled-up segments in compaction interval[%s].",
intervalDataSchema.getKey()
);
}
}
}
}
}
return CompactionConfigValidationResult.success();
}
@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
@ -291,6 +340,10 @@ public class MSQCompactionRunner implements CompactionRunner
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType());
}
return rowSignatureBuilder.build();
}
@ -354,14 +407,30 @@ public class MSQCompactionRunner implements CompactionRunner
private static Query<?> buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema)
{
RowSignature rowSignature = getRowSignature(dataSchema);
return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext())
.build();
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(dataSchema.getDataSource())
.columns(rowSignature.getColumnNames())
.virtualColumns(getVirtualColumns(dataSchema, interval))
.columnTypes(rowSignature.getColumnTypes())
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
.context(compactionTask.getContext());
if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
List<OrderByColumnSpec> orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
scanQueryBuilder.orderBy(
orderByColumnSpecs
.stream()
.map(orderByColumnSpec ->
new ScanQuery.OrderBy(
orderByColumnSpec.getDimension(),
ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString())
))
.collect(Collectors.toList())
);
}
return scanQueryBuilder.build();
}
private static boolean isGroupBy(DataSchema dataSchema)
@ -468,7 +537,10 @@ public class MSQCompactionRunner implements CompactionRunner
);
}
// Similar to compaction using the native engine, don't finalize aggregations.
// Used for writing the data schema during segment generation phase.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false);
// Add appropriate finalization to native query context i.e. for the GroupBy query
context.put(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false);
return context;

View File

@ -49,6 +49,7 @@ import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -957,6 +958,13 @@ public class MSQReplaceTest extends MSQTestBase
"Using RangeShardSpec to generate segments."
)
)
.setExpectedLastCompactionState(expectedCompactionState(
queryContext,
Collections.singletonList("dim1"),
Collections.singletonList(new StringDimensionSchema("dim1")),
GranularityType.ALL,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1150,7 +1158,6 @@ public class MSQReplaceTest extends MSQTestBase
+ "CLUSTERED BY dim1"
)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
@ -1165,6 +1172,19 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{0L, "def", 5.0f, 1L}
)
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.singletonList("dim1"),
Arrays.asList(
new StringDimensionSchema("dim1"),
new FloatDimensionSchema("m1"),
new LongDimensionSchema("cnt")
),
GranularityType.ALL,
Intervals.ETERNITY
)
)
.verifyResults();
}
@ -1571,6 +1591,7 @@ public class MSQReplaceTest extends MSQTestBase
+ "GROUP BY 1, 2 "
+ "PARTITIONED by TIME_FLOOR(__time, 'P3M') "
+ "CLUSTERED by dim1")
.setQueryContext(context)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
@ -1582,6 +1603,16 @@ public class MSQReplaceTest extends MSQTestBase
)
)
.setExpectedResultRows(expectedResults)
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.singletonList("dim1"),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.QUARTER,
Intervals.of("2000-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z")
))
.verifyResults();
}
@ -1652,6 +1683,16 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedResultRows(ImmutableList.of())
.setExpectedTombstoneIntervals(ImmutableSet.of(existingSegmentInterval))
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1688,6 +1729,16 @@ public class MSQReplaceTest extends MSQTestBase
Intervals.of("2016-06-27T01:00:00/2016-06-27T02:00:00")
)
)
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.HOUR,
Intervals.of("2016-06-27T01:00:00/2016-06-27T02:00:00")
))
.verifyResults();
}
@ -1726,6 +1777,16 @@ public class MSQReplaceTest extends MSQTestBase
Intervals.of("2016-06-30T/2016-07-01T")
)
)
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY,
Intervals.of("2016-06-29T00:00:00.000Z/2016-07-03T00:00:00.000Z")
))
.verifyResults();
}
@ -1764,6 +1825,16 @@ public class MSQReplaceTest extends MSQTestBase
Intervals.of("2016-06-02T/2016-06-03T")
)
)
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY,
Intervals.of("2016-05-25T00:00:00.000Z/2016-06-03T00:00:00.000Z")
))
.verifyResults();
}
@ -1802,6 +1873,16 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedResultRows(ImmutableList.of())
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY))
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1838,6 +1919,16 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedResultRows(ImmutableList.of())
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2016-06-01T/2016-09-01T")))
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.ALL,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1874,6 +1965,16 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedResultRows(ImmutableList.of())
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY))
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.ALL,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1906,6 +2007,16 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedResultRows(ImmutableList.of())
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY))
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.ALL,
Intervals.ETERNITY
))
.verifyResults();
}
@ -1966,6 +2077,16 @@ public class MSQReplaceTest extends MSQTestBase
Intervals.of("2016-06-02T/2016-06-03T")
)
)
.setExpectedLastCompactionState(expectedCompactionState(
context,
Collections.emptyList(),
ImmutableList.of(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY,
Intervals.of("2016-06-01T00:00:00.000Z/2016-06-03T00:00:00.000Z")
))
.verifyResults();
}
@ -2049,11 +2170,16 @@ public class MSQReplaceTest extends MSQTestBase
}
PartitionsSpec partitionsSpec;
if (partitionDimensions.isEmpty()) {
partitionsSpec = new DynamicPartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, Long.MAX_VALUE);
partitionsSpec = new DynamicPartitionsSpec(
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
Long.MAX_VALUE
);
} else {
partitionsSpec = new DimensionRangePartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, null,
partitionDimensions, false
partitionsSpec = new DimensionRangePartitionsSpec(
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
null,
partitionDimensions,
false
);
}
DimensionsSpec dimensionsSpec = new DimensionsSpec.Builder()

View File

@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.math.expr.ExprMacroTable;
@ -54,10 +55,12 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
@ -73,6 +76,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MSQCompactionRunnerTest
{
@ -127,7 +131,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -140,7 +144,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -153,7 +157,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -166,7 +170,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -179,7 +183,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -192,28 +196,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator",
validationResult.getReason()
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
}
@Test
@ -288,6 +271,10 @@ public class MSQCompactionRunnerTest
);
Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY));
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
Assert.assertEquals(PARTITION_DIMENSIONS.stream().map(col -> new ScanQuery.OrderBy(
col,
ScanQuery.Order.ASCENDING
)).collect(Collectors.toList()), ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys());
}
@Test
@ -358,6 +345,48 @@ public class MSQCompactionRunnerTest
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
}
@Test
public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails()
{
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
null,
null,
Collections.emptyMap(),
null,
new AggregatorFactory[]{
new LongSumAggregatorFactory(
outputColName,
inputColName
)
}
);
CombinedDataSchema dataSchema = new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
new UniformGranularitySpec(
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
false,
Collections.singletonList(COMPACTION_INTERVAL)
),
null,
true
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(validationResult.getReason(), StringUtils.format(
"MSQ: Rolled-up segments in compaction interval[%s].",
COMPACTION_INTERVAL
));
}
private CompactionTask createCompactionTask(
@Nullable PartitionsSpec partitionsSpec,
@Nullable DimFilter dimFilter,

View File

@ -57,6 +57,9 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
);
}

View File

@ -77,6 +77,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@ -459,11 +460,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
transformSpec,
metricsSpec,
granularitySpec,
getMetricBuilder()
getMetricBuilder(),
compactionRunner
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
CompactionConfigValidationResult supportsCompactionConfig =
compactionRunner.validateCompactionTask(this, intervalDataSchemas);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
@ -485,7 +488,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final ServiceMetricEvent.Builder metricBuilder
final ServiceMetricEvent.Builder metricBuilder,
CompactionRunner compactionRunner
) throws IOException
{
final Iterable<DataSegment> timelineSegments = retrieveRelevantTimelineHolders(
@ -549,7 +553,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse),
compactionRunner
);
intervalDataSchemaMap.put(interval, dataSchema);
}
@ -574,7 +579,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec
granularitySpec,
compactionRunner
);
return Collections.singletonMap(segmentProvider.interval, dataSchema);
}
@ -604,13 +610,17 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable CompactionRunner compactionRunner
)
{
// Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer(
segments,
granularitySpec.isRollup() == null,
// For MSQ, always need rollup to check if there are some rollup segments already present.
compactionRunner instanceof NativeCompactionRunner
? (granularitySpec.isRollup() == null)
: true,
granularitySpec.getQueryGranularity() == null,
dimensionsSpec == null,
metricsSpec == null
@ -665,13 +675,14 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
finalMetricsSpec = metricsSpec;
}
return new DataSchema(
return new CombinedDataSchema(
dataSource,
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null)
transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null),
existingSegmentAnalyzer.hasRolledUpSegments()
);
}
@ -748,6 +759,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// For processRollup:
private boolean rollup = true;
private boolean hasRolledUpSegments = false;
// For processQueryGranularity:
private Granularity queryGranularity;
@ -815,6 +827,11 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
return rollup;
}
public boolean hasRolledUpSegments()
{
return hasRolledUpSegments;
}
public Granularity getQueryGranularity()
{
if (!needQueryGranularity) {
@ -904,6 +921,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup);
}
private void processQueryGranularity(final QueryableIndex index)

View File

@ -85,7 +85,8 @@ public class NativeCompactionRunner implements CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
return CompactionConfigValidationResult.success();

View File

@ -749,7 +749,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -810,7 +811,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -872,7 +874,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -935,7 +938,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1005,7 +1009,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1055,7 +1060,8 @@ public class CompactionTaskTest
null,
customMetricsSpec,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1098,7 +1104,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1148,7 +1155,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
NativeCompactionRunner.createIngestionSpecs(
@ -1178,7 +1186,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
NativeCompactionRunner.createIngestionSpecs(
@ -1219,7 +1228,8 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1263,7 +1273,8 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
dataSchemasForIntervals,
@ -1308,7 +1319,8 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
null
),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1355,7 +1367,8 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1400,7 +1413,8 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1445,7 +1459,8 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1475,7 +1490,8 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
METRIC_BUILDER
METRIC_BUILDER,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(

View File

@ -28,10 +28,10 @@ import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
@ -328,8 +328,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception
{
// added = 31, count = null, sum_added = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
@ -356,7 +356,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
false
false,
engine
);
// should now only have 1 row after compaction
// added = null, count = 2, sum_added = 62
@ -559,8 +560,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -571,9 +572,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
// Dummy compaction config which will be overwritten
submitCompactionConfig(10000, NO_SKIP_OFFSET);
submitCompactionConfig(10000, NO_SKIP_OFFSET, engine);
// New compaction config should overwrites the existing compaction config
submitCompactionConfig(1, NO_SKIP_OFFSET);
submitCompactionConfig(1, NO_SKIP_OFFSET, engine);
LOG.info("Auto compaction test with dynamic partitioning");
@ -584,25 +585,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCompacted(10, 1);
checkCompactionIntervals(intervalsBeforeCompaction);
LOG.info("Auto compaction test with hash partitioning");
if (engine == CompactionEngine.NATIVE) {
// HashedPartitionsSpec not supported by MSQ.
LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(hashedPartitionsSpec, 4);
checkCompactionIntervals(intervalsBeforeCompaction);
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(hashedPartitionsSpec, 4);
checkCompactionIntervals(intervalsBeforeCompaction);
}
LOG.info("Auto compaction test with range partitioning");
final SingleDimensionPartitionsSpec rangePartitionsSpec = new SingleDimensionPartitionsSpec(
final DimensionRangePartitionsSpec rangePartitionsSpec = new DimensionRangePartitionsSpec(
5,
null,
"city",
ImmutableList.of("city"),
false
);
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false);
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
@ -695,8 +699,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue(CompactionEngine engine) throws Exception
{
// Interval is "2013-08-31/2013-09-02", segment gran is DAY,
// "maxRowsPerSegment": 3
@ -732,7 +736,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
true,
engine
);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
@ -750,7 +760,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
newGranularity = Granularities.MONTH;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
true,
engine
);
// Since dropExisting is set to true...
// Again data is only in two days
@ -778,7 +794,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// compact only tombstones, so it should be a tombstone itself.
newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
true,
engine
);
// Since dropExisting is set to true...
// The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the
@ -804,8 +826,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse(CompactionEngine engine) throws Exception
{
// Interval is "2013-08-31/2013-09-02", segment gran is DAY,
// "maxRowsPerSegment": 3
@ -841,7 +863,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
@ -859,7 +881,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
newGranularity = Granularities.MONTH;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
// Since dropExisting is set to true...
// Again data is only in two days
@ -885,9 +907,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones");
newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity,
null, null
), false);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
false,
engine
);
// Since dropExisting is set to false the first semester will be forced to dropExisting true
// Hence, we will have two, one tombstone for the first semester and one data segment for the second.
@ -963,8 +989,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -974,14 +1000,14 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), engine);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine);
LOG.info("Auto compaction test with YEAR segment granularity");
@ -1001,8 +1027,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -1013,7 +1039,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
// Compacted without SegmentGranularity in auto compaction config
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -1023,7 +1049,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.DAY;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -1033,8 +1059,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -1045,7 +1071,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
// Compacted without SegmentGranularity in auto compaction config
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -1055,7 +1081,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine);
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -1066,8 +1092,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue(CompactionEngine engine) throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -1079,7 +1105,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We will still have one visible segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@ -1109,7 +1135,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
coordinator.getSegmentIntervals(fullDatasourceName);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
// verify:
expectedIntervalAfterCompaction = new ArrayList<>();
// The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be
@ -1245,8 +1271,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment(CompactionEngine engine) throws Exception
{
updateCompactionTaskSlot(1, 1, null);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
@ -1263,7 +1289,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null),
false
false,
engine
);
// Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z
// We will compact the latest segment to MONTH.
@ -1292,8 +1319,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithRollup() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@ -1309,7 +1336,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
false
false,
engine
);
forceTriggerAutoCompaction(2);
queryAndResultFields = ImmutableMap.of(
@ -1328,8 +1356,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithQueryGranularity() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithQueryGranularity(CompactionEngine engine) throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@ -1345,7 +1373,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, Granularities.DAY, null),
false
false,
engine
);
forceTriggerAutoCompaction(2);
queryAndResultFields = ImmutableMap.of(
@ -1364,8 +1393,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithDimensionsSpec() throws Exception
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) throws Exception
{
// Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous",
// "namespace", "continent", "country", "region", "city"
@ -1392,7 +1421,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
null,
false
false,
engine
);
forceTriggerAutoCompaction(2);
@ -1760,30 +1790,6 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
);
}
private void submitCompactionConfig(
PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks,
UserCompactionTaskGranularityConfig granularitySpec,
UserCompactionTaskDimensionsConfig dimensionsSpec,
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting
) throws Exception
{
submitCompactionConfig(
partitionsSpec,
skipOffsetFromLatest,
maxNumConcurrentSubTasks,
granularitySpec,
dimensionsSpec,
transformSpec,
metricsSpec,
dropExisting,
null
);
}
private void submitCompactionConfig(
PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest,
@ -1829,7 +1835,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
transformSpec,
!dropExisting ? null : new UserCompactionTaskIOConfig(true),
engine,
null
ImmutableMap.of("maxNumTasks", 2)
);
compactionResource.submitCompactionConfig(compactionConfig);

View File

@ -32,7 +32,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -121,7 +120,6 @@ public class ClientCompactionRunnerInfo
));
}
validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext()));
validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
@ -183,28 +181,4 @@ public class ClientCompactionRunnerInfo
}
return CompactionConfigValidationResult.success();
}
/**
* Validate each metric has output column name same as the input name.
*/
public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
if (metricsSpec == null) {
return CompactionConfigValidationResult.success();
}
return Arrays.stream(metricsSpec)
.filter(aggregatorFactory ->
!(aggregatorFactory.requiredFields().isEmpty()
|| aggregatorFactory.requiredFields().size() == 1
&& aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName())))
.findFirst()
.map(aggregatorFactory ->
CompactionConfigValidationResult.failure(
"MSQ: Different name[%s] and fieldName(s)[%s] for aggregator",
aggregatorFactory.getName(),
aggregatorFactory.requiredFields()
)).orElse(CompactionConfigValidationResult.success());
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import javax.annotation.Nullable;
/**
* Class representing the combined DataSchema of a set of segments, currently used only by Compaction.
*/
public class CombinedDataSchema extends DataSchema
{
private final boolean hasRolledUpSegments;
public CombinedDataSchema(
String dataSource,
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
AggregatorFactory[] aggregators,
GranularitySpec granularitySpec,
TransformSpec transformSpec,
@Nullable boolean hasRolledUpSegments
)
{
super(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
null,
null
);
this.hasRolledUpSegments = hasRolledUpSegments;
}
public boolean hasRolledUpSegments()
{
return hasRolledUpSegments;
}
}

View File

@ -129,7 +129,7 @@ public class ClientCompactionRunnerInfoTest
}
@Test
public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid()
public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
@ -148,29 +148,6 @@ public class ClientCompactionRunnerInfoTest
);
}
@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
final String outputColName = "sum_added";
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator",
validationResult.getReason()
);
}
@Test
public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
{

View File

@ -684,4 +684,24 @@ public class DataSchemaTest extends InitializedNullHandlingTest
Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
}
@Test
public void testCombinedDataSchemaSetsHasRolledUpSegments()
{
CombinedDataSchema schema = new CombinedDataSchema(
IdUtilsTest.VALID_ID_CHARS,
new TimestampSpec("time", "auto", null),
DimensionsSpec.builder()
.setDimensions(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1"))
)
.setDimensionExclusions(ImmutableList.of("dimC"))
.build(),
null,
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
true
);
Assert.assertTrue(schema.hasRolledUpSegments());
}
}