From e9ea243d972f5452926bb6241ac56bc36697f237 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 30 Jul 2024 09:34:46 +0530 Subject: [PATCH] Enable compaction ITs on MSQ engine (#16778) Follow-up to #16291, this commit enables a subset of existing native compaction ITs on the MSQ engine. In the process, the following changes have been introduced in the MSQ compaction flow: - Populate `metricsSpec` in `CompactionState` from `querySpec` in `MSQControllerTask` instead of `dataSchema` - Add check for pre-rolled-up segments having `AggregatorFactory` with different input and output column names - Fix passing missing cluster-by clause in scan queries - Add annotation of `CompactionState` to tombstone segments --- .../apache/druid/msq/exec/ControllerImpl.java | 91 ++++++--- .../msq/indexing/MSQCompactionRunner.java | 92 ++++++++- .../apache/druid/msq/exec/MSQReplaceTest.java | 136 +++++++++++++- .../msq/indexing/MSQCompactionRunnerTest.java | 83 ++++++--- .../common/task/CompactionRunner.java | 5 +- .../indexing/common/task/CompactionTask.java | 36 +++- .../common/task/NativeCompactionRunner.java | 3 +- .../common/task/CompactionTaskTest.java | 48 +++-- .../duty/ITAutoCompactionTest.java | 174 +++++++++--------- .../indexing/ClientCompactionRunnerInfo.java | 26 --- .../segment/indexing/CombinedDataSchema.java | 64 +++++++ .../ClientCompactionRunnerInfoTest.java | 25 +-- .../segment/indexing/DataSchemaTest.java | 20 ++ 13 files changed, 571 insertions(+), 232 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index a30e9686087..d2d5cc657e6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -1357,7 +1356,10 @@ public class ControllerImpl implements Controller * Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()}, * also drop all other segments within the replacement intervals. */ - private void publishAllSegments(final Set segments) throws IOException + private void publishAllSegments( + final Set segments, + Function, Set> compactionStateAnnotateFunction + ) throws IOException { final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); @@ -1413,7 +1415,7 @@ public class ControllerImpl implements Controller } performSegmentPublish( context.taskActionClient(), - createOverwriteAction(taskLockType, segmentsWithTombstones) + createOverwriteAction(taskLockType, compactionStateAnnotateFunction.apply(segmentsWithTombstones)) ); } } else if (!segments.isEmpty()) { @@ -1543,6 +1545,7 @@ public class ControllerImpl implements Controller if (MSQControllerTask.isIngestion(querySpec)) { // Publish segments if needed. final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); + Function, Set> compactionStateAnnotateFunction = Function.identity(); @SuppressWarnings("unchecked") Set segments = (Set) queryKernel.getResultObjectForStage(finalStageId); @@ -1553,7 +1556,7 @@ public class ControllerImpl implements Controller Tasks.DEFAULT_STORE_COMPACTION_STATE ); - if (!segments.isEmpty() && storeCompactionState) { + if (storeCompactionState) { DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); if (!destination.isReplaceTimeChunks()) { // Store compaction state only for replace queries. @@ -1565,20 +1568,21 @@ public class ControllerImpl implements Controller DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema(); - ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec(); + ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec(); + ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy(); - Function, Set> compactionStateAnnotateFunction = addCompactionStateToSegments( + compactionStateAnnotateFunction = addCompactionStateToSegments( querySpec, context.jsonMapper(), dataSchema, shardSpec, + clusterBy, queryDef.getQueryId() ); - segments = compactionStateAnnotateFunction.apply(segments); } } log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + publishAllSegments(segments, compactionStateAnnotateFunction); } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); @@ -1624,33 +1628,49 @@ public class ControllerImpl implements Controller MSQSpec querySpec, ObjectMapper jsonMapper, DataSchema dataSchema, - ShardSpec shardSpec, + @Nullable ShardSpec shardSpec, + @Nullable ClusterBy clusterBy, String queryId ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); PartitionsSpec partitionSpec; - if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) { - List partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + // shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an + // MSQControllerTask. + if (shardSpec != null) { + if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) { + List partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); + partitionSpec = new DimensionRangePartitionsSpec( + tuningConfig.getRowsPerSegment(), + null, + partitionDimensions, + false + ); + } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { + // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE. + partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); + } else { + // SingleDimenionShardSpec and other shard specs are never created in MSQ. + throw new MSQException( + UnknownFault.forMessage( + StringUtils.format( + "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].", + queryId, + shardSpec.getType() + ))); + } + } else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) { partitionSpec = new DimensionRangePartitionsSpec( tuningConfig.getRowsPerSegment(), null, - partitionDimensions, + clusterBy.getColumns() + .stream() + .map(KeyColumn::columnName).collect(Collectors.toList()), false ); - } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { - // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE. - partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); } else { - // SingleDimenionShardSpec and other shard specs are never created in MSQ. - throw new MSQException( - UnknownFault.forMessage( - StringUtils.format( - "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].", - queryId, - shardSpec.getType() - ))); + partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); } Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination()) @@ -1671,13 +1691,26 @@ public class ControllerImpl implements Controller : new ClientCompactionTaskTransformSpec( dataSchema.getTransformSpec().getFilter() ).asMap(jsonMapper); - List metricsSpec = dataSchema.getAggregators() == null - ? null - : jsonMapper.convertValue( - dataSchema.getAggregators(), - new TypeReference>() {} - ); + List metricsSpec = Collections.emptyList(); + if (querySpec.getQuery() instanceof GroupByQuery) { + // For group-by queries, the aggregators are transformed to their combining factories in the dataschema, resulting + // in a mismatch between schema in compaction spec and the one in compaction state. Sourcing the original + // AggregatorFactory definition for aggregators in the dataSchema, therefore, directly from the querySpec. + GroupByQuery groupByQuery = (GroupByQuery) querySpec.getQuery(); + // Collect all aggregators that are part of the current dataSchema, since a non-rollup query (isRollup() is false) + // moves metrics columns to dimensions in the final schema. + Set aggregatorsInDataSchema = Arrays.stream(dataSchema.getAggregators()) + .map(AggregatorFactory::getName) + .collect( + Collectors.toSet()); + metricsSpec = new ArrayList<>( + groupByQuery.getAggregatorSpecs() + .stream() + .filter(aggregatorFactory -> aggregatorsInDataSchema.contains(aggregatorFactory.getName())) + .collect(Collectors.toList()) + ); + } IndexSpec indexSpec = tuningConfig.getIndexSpec(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 28ec422e2ac..7b4d3235dc0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -49,7 +49,9 @@ import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.expression.TimestampFloorExprMacro; @@ -58,11 +60,13 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; @@ -123,7 +127,8 @@ public class MSQCompactionRunner implements CompactionRunner */ @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalToDataSchemaMap ) { List validationResults = new ArrayList<>(); @@ -139,13 +144,57 @@ public class MSQCompactionRunner implements CompactionRunner )); } validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext())); - validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec())); + validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap)); return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() .orElse(CompactionConfigValidationResult.success()); } + /** + * Valides that there are no rolled-up segments where either: + *
    + *
  • aggregator factory differs from its combining factory
  • + *
  • input col name is different from the output name (non-idempotent)
  • + *
+ */ + private CompactionConfigValidationResult validateRolledUpSegments(Map intervalToDataSchemaMap) + { + for (Map.Entry intervalDataSchema : intervalToDataSchemaMap.entrySet()) { + if (intervalDataSchema.getValue() instanceof CombinedDataSchema) { + CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue(); + if (combinedDataSchema.hasRolledUpSegments()) { + for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) { + // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in + // compaction spec isn't. This would get properly compacted yet fails in the below pre-check. + if ( + !( + aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) && + ( + aggregatorFactory.requiredFields().isEmpty() || + (aggregatorFactory.requiredFields().size() == 1 && + aggregatorFactory.requiredFields() + .get(0) + .equals(aggregatorFactory.getName())) + ) + ) + ) { + // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from + // the aggregated column name. This is because the aggregated values would then get overwritten by new + // values and the existing values would be lost. Note that if no rollup is specified in an index spec, + // the default value is true. + return CompactionConfigValidationResult.failure( + "MSQ: Rolled-up segments in compaction interval[%s].", + intervalDataSchema.getKey() + ); + } + } + } + } + } + return CompactionConfigValidationResult.success(); + } + @Override public CurrentSubTaskHolder getCurrentSubTaskHolder() { @@ -291,6 +340,10 @@ public class MSQCompactionRunner implements CompactionRunner for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) { rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName())); } + // There can be columns that are part of metricsSpec for a datasource. + for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) { + rowSignatureBuilder.add(aggregatorFactory.getName(), aggregatorFactory.getIntermediateType()); + } return rowSignatureBuilder.build(); } @@ -354,14 +407,30 @@ public class MSQCompactionRunner implements CompactionRunner private static Query buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) { RowSignature rowSignature = getRowSignature(dataSchema); - return new Druids.ScanQueryBuilder().dataSource(dataSchema.getDataSource()) - .columns(rowSignature.getColumnNames()) - .virtualColumns(getVirtualColumns(dataSchema, interval)) - .columnTypes(rowSignature.getColumnTypes()) - .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) - .filters(dataSchema.getTransformSpec().getFilter()) - .context(compactionTask.getContext()) - .build(); + Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder() + .dataSource(dataSchema.getDataSource()) + .columns(rowSignature.getColumnNames()) + .virtualColumns(getVirtualColumns(dataSchema, interval)) + .columnTypes(rowSignature.getColumnTypes()) + .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) + .filters(dataSchema.getTransformSpec().getFilter()) + .context(compactionTask.getContext()); + + if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { + List orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); + + scanQueryBuilder.orderBy( + orderByColumnSpecs + .stream() + .map(orderByColumnSpec -> + new ScanQuery.OrderBy( + orderByColumnSpec.getDimension(), + ScanQuery.Order.fromString(orderByColumnSpec.getDirection().toString()) + )) + .collect(Collectors.toList()) + ); + } + return scanQueryBuilder.build(); } private static boolean isGroupBy(DataSchema dataSchema) @@ -468,7 +537,10 @@ public class MSQCompactionRunner implements CompactionRunner ); } // Similar to compaction using the native engine, don't finalize aggregations. + // Used for writing the data schema during segment generation phase. context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS, false); + // Add appropriate finalization to native query context i.e. for the GroupBy query + context.put(QueryContexts.FINALIZE_KEY, false); // Only scalar or array-type dimensions are allowed as grouping keys. context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING, false); return context; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index f6eb80b3282..1e8dc474f6e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -49,6 +49,7 @@ import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestTaskActionClient; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -957,6 +958,13 @@ public class MSQReplaceTest extends MSQTestBase "Using RangeShardSpec to generate segments." ) ) + .setExpectedLastCompactionState(expectedCompactionState( + queryContext, + Collections.singletonList("dim1"), + Collections.singletonList(new StringDimensionSchema("dim1")), + GranularityType.ALL, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1150,7 +1158,6 @@ public class MSQReplaceTest extends MSQTestBase + "CLUSTERED BY dim1" ) .setExpectedDataSource("foo1") - .setQueryContext(DEFAULT_MSQ_CONTEXT) .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedRowSignature(rowSignature) .setQueryContext(context) @@ -1165,6 +1172,19 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{0L, "def", 5.0f, 1L} ) ) + .setExpectedLastCompactionState( + expectedCompactionState( + context, + Collections.singletonList("dim1"), + Arrays.asList( + new StringDimensionSchema("dim1"), + new FloatDimensionSchema("m1"), + new LongDimensionSchema("cnt") + ), + GranularityType.ALL, + Intervals.ETERNITY + ) + ) .verifyResults(); } @@ -1571,6 +1591,7 @@ public class MSQReplaceTest extends MSQTestBase + "GROUP BY 1, 2 " + "PARTITIONED by TIME_FLOOR(__time, 'P3M') " + "CLUSTERED by dim1") + .setQueryContext(context) .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) .setExpectedShardSpec(DimensionRangeShardSpec.class) @@ -1582,6 +1603,16 @@ public class MSQReplaceTest extends MSQTestBase ) ) .setExpectedResultRows(expectedResults) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.singletonList("dim1"), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.QUARTER, + Intervals.of("2000-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z") + )) .verifyResults(); } @@ -1652,6 +1683,16 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedResultRows(ImmutableList.of()) .setExpectedTombstoneIntervals(ImmutableSet.of(existingSegmentInterval)) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.DAY, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1688,6 +1729,16 @@ public class MSQReplaceTest extends MSQTestBase Intervals.of("2016-06-27T01:00:00/2016-06-27T02:00:00") ) ) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.HOUR, + Intervals.of("2016-06-27T01:00:00/2016-06-27T02:00:00") + )) .verifyResults(); } @@ -1726,6 +1777,16 @@ public class MSQReplaceTest extends MSQTestBase Intervals.of("2016-06-30T/2016-07-01T") ) ) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.DAY, + Intervals.of("2016-06-29T00:00:00.000Z/2016-07-03T00:00:00.000Z") + )) .verifyResults(); } @@ -1764,6 +1825,16 @@ public class MSQReplaceTest extends MSQTestBase Intervals.of("2016-06-02T/2016-06-03T") ) ) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.DAY, + Intervals.of("2016-05-25T00:00:00.000Z/2016-06-03T00:00:00.000Z") + )) .verifyResults(); } @@ -1802,6 +1873,16 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedResultRows(ImmutableList.of()) .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY)) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.DAY, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1838,6 +1919,16 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedResultRows(ImmutableList.of()) .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2016-06-01T/2016-09-01T"))) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.ALL, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1874,6 +1965,16 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedResultRows(ImmutableList.of()) .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY)) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.ALL, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1906,6 +2007,16 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedDataSource("foo1") .setExpectedResultRows(ImmutableList.of()) .setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.ETERNITY)) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.ALL, + Intervals.ETERNITY + )) .verifyResults(); } @@ -1966,6 +2077,16 @@ public class MSQReplaceTest extends MSQTestBase Intervals.of("2016-06-02T/2016-06-03T") ) ) + .setExpectedLastCompactionState(expectedCompactionState( + context, + Collections.emptyList(), + ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("cnt") + ), + GranularityType.DAY, + Intervals.of("2016-06-01T00:00:00.000Z/2016-06-03T00:00:00.000Z") + )) .verifyResults(); } @@ -2049,11 +2170,16 @@ public class MSQReplaceTest extends MSQTestBase } PartitionsSpec partitionsSpec; if (partitionDimensions.isEmpty()) { - partitionsSpec = new DynamicPartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, Long.MAX_VALUE); - + partitionsSpec = new DynamicPartitionsSpec( + MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)), + Long.MAX_VALUE + ); } else { - partitionsSpec = new DimensionRangePartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, null, - partitionDimensions, false + partitionsSpec = new DimensionRangePartitionsSpec( + MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)), + null, + partitionDimensions, + false ); } DimensionsSpec dimensionsSpec = new DimensionsSpec.Builder() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 3f73a9b513e..6c5d1957265 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.TuningConfigBuilder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.math.expr.ExprMacroTable; @@ -54,10 +55,12 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.TransformSpec; @@ -73,6 +76,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class MSQCompactionRunnerTest { @@ -127,7 +131,7 @@ public class MSQCompactionRunnerTest null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -140,7 +144,7 @@ public class MSQCompactionRunnerTest null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -153,7 +157,7 @@ public class MSQCompactionRunnerTest null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -166,7 +170,7 @@ public class MSQCompactionRunnerTest null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -179,7 +183,7 @@ public class MSQCompactionRunnerTest new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null), null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -192,28 +196,7 @@ public class MSQCompactionRunnerTest new ClientCompactionTaskGranularitySpec(null, null, false), AGGREGATORS.toArray(new AggregatorFactory[0]) ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); - } - - @Test - public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() - { - // Aggregators having different input and ouput column names are unsupported. - final String inputColName = "added"; - final String outputColName = "sum_added"; - CompactionTask compactionTask = createCompactionTask( - new DynamicPartitionsSpec(3, null), - null, - Collections.emptyMap(), - new ClientCompactionTaskGranularitySpec(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} - ); - CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask); - Assert.assertFalse(validationResult.isValid()); - Assert.assertEquals( - "MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator", - validationResult.getReason() - ); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid()); } @Test @@ -288,6 +271,10 @@ public class MSQCompactionRunnerTest ); Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY)); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); + Assert.assertEquals(PARTITION_DIMENSIONS.stream().map(col -> new ScanQuery.OrderBy( + col, + ScanQuery.Order.ASCENDING + )).collect(Collectors.toList()), ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys()); } @Test @@ -358,6 +345,48 @@ public class MSQCompactionRunnerTest Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); } + @Test + public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails() + { + final String inputColName = "added"; + final String outputColName = "sum_added"; + CompactionTask compactionTask = createCompactionTask( + null, + null, + Collections.emptyMap(), + null, + new AggregatorFactory[]{ + new LongSumAggregatorFactory( + outputColName, + inputColName + ) + } + ); + CombinedDataSchema dataSchema = new CombinedDataSchema( + DATA_SOURCE, + new TimestampSpec(TIMESTAMP_COLUMN, null, null), + new DimensionsSpec(DIMENSIONS), + new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}, + new UniformGranularitySpec( + SEGMENT_GRANULARITY.getDefaultGranularity(), + null, + false, + Collections.singletonList(COMPACTION_INTERVAL) + ), + null, + true + ); + CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask( + compactionTask, + Collections.singletonMap(COMPACTION_INTERVAL, dataSchema) + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals(validationResult.getReason(), StringUtils.format( + "MSQ: Rolled-up segments in compaction interval[%s].", + COMPACTION_INTERVAL + )); + } + private CompactionTask createCompactionTask( @Nullable PartitionsSpec partitionsSpec, @Nullable DimFilter dimFilter, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java index 8d30a60d04e..0abaeed8eb2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java @@ -57,6 +57,9 @@ public interface CompactionRunner * Checks if the provided compaction config is supported by the runner. * The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask} */ - CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask); + CompactionConfigValidationResult validateCompactionTask( + CompactionTask compactionTask, + Map intervalToDataSchemaMap + ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 8659eb0f397..68320387845 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -77,6 +77,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -459,11 +460,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg transformSpec, metricsSpec, granularitySpec, - getMetricBuilder() + getMetricBuilder(), + compactionRunner ); registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder()); - CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this); + CompactionConfigValidationResult supportsCompactionConfig = + compactionRunner.validateCompactionTask(this, intervalDataSchemas); if (!supportsCompactionConfig.isValid()) { throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason()); } @@ -485,7 +488,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg @Nullable final ClientCompactionTaskTransformSpec transformSpec, @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, - final ServiceMetricEvent.Builder metricBuilder + final ServiceMetricEvent.Builder metricBuilder, + CompactionRunner compactionRunner ) throws IOException { final Iterable timelineSegments = retrieveRelevantTimelineHolders( @@ -549,7 +553,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg metricsSpec, granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null) - : granularitySpec.withSegmentGranularity(segmentGranularityToUse) + : granularitySpec.withSegmentGranularity(segmentGranularityToUse), + compactionRunner ); intervalDataSchemaMap.put(interval, dataSchema); } @@ -574,7 +579,8 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg dimensionsSpec, transformSpec, metricsSpec, - granularitySpec + granularitySpec, + compactionRunner ); return Collections.singletonMap(segmentProvider.interval, dataSchema); } @@ -604,13 +610,17 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg @Nullable DimensionsSpec dimensionsSpec, @Nullable ClientCompactionTaskTransformSpec transformSpec, @Nullable AggregatorFactory[] metricsSpec, - @Nonnull ClientCompactionTaskGranularitySpec granularitySpec + @Nonnull ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable CompactionRunner compactionRunner ) { // Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer( segments, - granularitySpec.isRollup() == null, + // For MSQ, always need rollup to check if there are some rollup segments already present. + compactionRunner instanceof NativeCompactionRunner + ? (granularitySpec.isRollup() == null) + : true, granularitySpec.getQueryGranularity() == null, dimensionsSpec == null, metricsSpec == null @@ -665,13 +675,14 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg finalMetricsSpec = metricsSpec; } - return new DataSchema( + return new CombinedDataSchema( dataSource, new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), finalDimensionsSpec, finalMetricsSpec, uniformGranularitySpec, - transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null) + transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), + existingSegmentAnalyzer.hasRolledUpSegments() ); } @@ -748,6 +759,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg // For processRollup: private boolean rollup = true; + private boolean hasRolledUpSegments = false; // For processQueryGranularity: private Granularity queryGranularity; @@ -815,6 +827,11 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg return rollup; } + public boolean hasRolledUpSegments() + { + return hasRolledUpSegments; + } + public Granularity getQueryGranularity() { if (!needQueryGranularity) { @@ -904,6 +921,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false final Boolean isIndexRollup = index.getMetadata().isRollup(); rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup); + hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup); } private void processQueryGranularity(final QueryableIndex index) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index 2074d14f0f9..5aa7af71451 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -85,7 +85,8 @@ public class NativeCompactionRunner implements CompactionRunner @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalToDataSchemaMap ) { return CompactionConfigValidationResult.success(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f9849b1483d..3a386bc4aa7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -749,7 +749,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -810,7 +811,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -872,7 +874,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -935,7 +938,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1005,7 +1009,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1055,7 +1060,8 @@ public class CompactionTaskTest null, customMetricsSpec, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1098,7 +1104,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1148,7 +1155,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); NativeCompactionRunner.createIngestionSpecs( @@ -1178,7 +1186,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); NativeCompactionRunner.createIngestionSpecs( @@ -1219,7 +1228,8 @@ public class CompactionTaskTest null, null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1263,7 +1273,8 @@ public class CompactionTaskTest null, null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( dataSchemasForIntervals, @@ -1308,7 +1319,8 @@ public class CompactionTaskTest new PeriodGranularity(Period.months(3), null, null), null ), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1355,7 +1367,8 @@ public class CompactionTaskTest null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1400,7 +1413,8 @@ public class CompactionTaskTest null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1445,7 +1459,8 @@ public class CompactionTaskTest null, null, new ClientCompactionTaskGranularitySpec(null, null, true), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1475,7 +1490,8 @@ public class CompactionTaskTest null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + null ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 230a19236c1..e0400d3cce2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -28,10 +28,10 @@ import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.task.CompactionIntervalSpec; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; @@ -328,8 +328,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception { // added = 31, count = null, sum_added = null loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); @@ -356,7 +356,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), null, new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")}, - false + false, + engine ); // should now only have 1 row after compaction // added = null, count = 2, sum_added = 62 @@ -559,8 +560,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -571,9 +572,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifyQuery(INDEX_QUERIES_RESOURCE); // Dummy compaction config which will be overwritten - submitCompactionConfig(10000, NO_SKIP_OFFSET); + submitCompactionConfig(10000, NO_SKIP_OFFSET, engine); // New compaction config should overwrites the existing compaction config - submitCompactionConfig(1, NO_SKIP_OFFSET); + submitCompactionConfig(1, NO_SKIP_OFFSET, engine); LOG.info("Auto compaction test with dynamic partitioning"); @@ -584,25 +585,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCompacted(10, 1); checkCompactionIntervals(intervalsBeforeCompaction); - LOG.info("Auto compaction test with hash partitioning"); + if (engine == CompactionEngine.NATIVE) { + // HashedPartitionsSpec not supported by MSQ. + LOG.info("Auto compaction test with hash partitioning"); - final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); - submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false); - // 2 segments published per day after compaction. - forceTriggerAutoCompaction(4); - verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(hashedPartitionsSpec, 4); - checkCompactionIntervals(intervalsBeforeCompaction); + final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); + submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); + // 2 segments published per day after compaction. + forceTriggerAutoCompaction(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(hashedPartitionsSpec, 4); + checkCompactionIntervals(intervalsBeforeCompaction); + } LOG.info("Auto compaction test with range partitioning"); - final SingleDimensionPartitionsSpec rangePartitionsSpec = new SingleDimensionPartitionsSpec( + final DimensionRangePartitionsSpec rangePartitionsSpec = new DimensionRangePartitionsSpec( 5, null, - "city", + ImmutableList.of("city"), false ); - submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false); + submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(rangePartitionsSpec, 2); @@ -695,8 +699,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue(CompactionEngine engine) throws Exception { // Interval is "2013-08-31/2013-09-02", segment gran is DAY, // "maxRowsPerSegment": 3 @@ -732,7 +736,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); List expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { @@ -750,7 +760,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", newGranularity = Granularities.MONTH; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); // Since dropExisting is set to true... // Again data is only in two days @@ -778,7 +794,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // compact only tombstones, so it should be a tombstone itself. newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + true, + engine + ); // Since dropExisting is set to true... // The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the @@ -804,8 +826,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse(CompactionEngine engine) throws Exception { // Interval is "2013-08-31/2013-09-02", segment gran is DAY, // "maxRowsPerSegment": 3 @@ -841,7 +863,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); List expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { @@ -859,7 +881,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", newGranularity = Granularities.MONTH; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); // Since dropExisting is set to true... // Again data is only in two days @@ -885,9 +907,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones"); newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); // Set dropExisting to false - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, - null, null - ), false); + submitCompactionConfig( + 1000, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(newGranularity, null, null), + false, + engine + ); // Since dropExisting is set to false the first semester will be forced to dropExisting true // Hence, we will have two, one tombstone for the first semester and one data segment for the second. @@ -963,8 +989,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -974,14 +1000,14 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), engine); //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total) forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -1001,8 +1027,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveSameSegmentGranularity(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1013,7 +1039,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifyQuery(INDEX_QUERIES_RESOURCE); // Compacted without SegmentGranularity in auto compaction config - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1023,7 +1049,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.DAY; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1033,8 +1059,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndExistingCompactedSegmentsHaveDifferentSegmentGranularity(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1045,7 +1071,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifyQuery(INDEX_QUERIES_RESOURCE); // Compacted without SegmentGranularity in auto compaction config - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, engine); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1055,7 +1081,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), engine); forceTriggerAutoCompaction(1); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -1066,8 +1092,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue(CompactionEngine engine) throws Exception { loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1079,7 +1105,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); List expectedIntervalAfterCompaction = new ArrayList<>(); // We will still have one visible segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) @@ -1109,7 +1135,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest coordinator.getSegmentIntervals(fullDatasourceName); // Since dropExisting is set to true... // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine); // verify: expectedIntervalAfterCompaction = new ArrayList<>(); // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be @@ -1245,8 +1271,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment(CompactionEngine engine) throws Exception { updateCompactionTaskSlot(1, 1, null); final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); @@ -1263,7 +1289,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null), - false + false, + engine ); // Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z // We will compact the latest segment to MONTH. @@ -1292,8 +1319,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithRollup() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception { final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); @@ -1309,7 +1336,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(null, null, true), - false + false, + engine ); forceTriggerAutoCompaction(2); queryAndResultFields = ImmutableMap.of( @@ -1328,8 +1356,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithQueryGranularity() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithQueryGranularity(CompactionEngine engine) throws Exception { final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); @@ -1345,7 +1373,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(null, Granularities.DAY, null), - false + false, + engine ); forceTriggerAutoCompaction(2); queryAndResultFields = ImmutableMap.of( @@ -1364,8 +1393,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - @Test - public void testAutoCompactionDutyWithDimensionsSpec() throws Exception + @Test(dataProvider = "engine") + public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) throws Exception { // Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", // "namespace", "continent", "country", "region", "city" @@ -1392,7 +1421,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), null, null, - false + false, + engine ); forceTriggerAutoCompaction(2); @@ -1760,30 +1790,6 @@ public class ITAutoCompactionTest extends AbstractIndexerTest ); } - private void submitCompactionConfig( - PartitionsSpec partitionsSpec, - Period skipOffsetFromLatest, - int maxNumConcurrentSubTasks, - UserCompactionTaskGranularityConfig granularitySpec, - UserCompactionTaskDimensionsConfig dimensionsSpec, - UserCompactionTaskTransformConfig transformSpec, - AggregatorFactory[] metricsSpec, - boolean dropExisting - ) throws Exception - { - submitCompactionConfig( - partitionsSpec, - skipOffsetFromLatest, - maxNumConcurrentSubTasks, - granularitySpec, - dimensionsSpec, - transformSpec, - metricsSpec, - dropExisting, - null - ); - } - private void submitCompactionConfig( PartitionsSpec partitionsSpec, Period skipOffsetFromLatest, @@ -1829,7 +1835,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest transformSpec, !dropExisting ? null : new UserCompactionTaskIOConfig(true), engine, - null + ImmutableMap.of("maxNumTasks", 2) ); compactionResource.submitCompactionConfig(compactionConfig); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index 74504184608..0f92d99db10 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -32,7 +32,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -121,7 +120,6 @@ public class ClientCompactionRunnerInfo )); } validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext())); - validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec())); return validationResults.stream() .filter(result -> !result.isValid()) .findFirst() @@ -183,28 +181,4 @@ public class ClientCompactionRunnerInfo } return CompactionConfigValidationResult.success(); } - - /** - * Validate each metric has output column name same as the input name. - */ - public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec) - { - if (metricsSpec == null) { - return CompactionConfigValidationResult.success(); - } - return Arrays.stream(metricsSpec) - .filter(aggregatorFactory -> - !(aggregatorFactory.requiredFields().isEmpty() - || aggregatorFactory.requiredFields().size() == 1 - && aggregatorFactory.requiredFields() - .get(0) - .equals(aggregatorFactory.getName()))) - .findFirst() - .map(aggregatorFactory -> - CompactionConfigValidationResult.failure( - "MSQ: Different name[%s] and fieldName(s)[%s] for aggregator", - aggregatorFactory.getName(), - aggregatorFactory.requiredFields() - )).orElse(CompactionConfigValidationResult.success()); - } } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java new file mode 100644 index 00000000000..b2cb90bc0ce --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; + +import javax.annotation.Nullable; + +/** + * Class representing the combined DataSchema of a set of segments, currently used only by Compaction. + */ +public class CombinedDataSchema extends DataSchema +{ + private final boolean hasRolledUpSegments; + + public CombinedDataSchema( + String dataSource, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators, + GranularitySpec granularitySpec, + TransformSpec transformSpec, + @Nullable boolean hasRolledUpSegments + ) + { + super( + dataSource, + timestampSpec, + dimensionsSpec, + aggregators, + granularitySpec, + transformSpec, + null, + null + ); + this.hasRolledUpSegments = hasRolledUpSegments; + } + + public boolean hasRolledUpSegments() + { + return hasRolledUpSegments; + } +} diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index 01daa1da4af..7742eaaf138 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -129,7 +129,7 @@ public class ClientCompactionRunnerInfoTest } @Test - public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid() + public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid() { DataSourceCompactionConfig compactionConfig = createCompactionConfig( new DynamicPartitionsSpec(3, null), @@ -148,29 +148,6 @@ public class ClientCompactionRunnerInfoTest ); } - @Test - public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() - { - // Aggregators having different input and ouput column names are unsupported. - final String inputColName = "added"; - final String outputColName = "sum_added"; - DataSourceCompactionConfig compactionConfig = createCompactionConfig( - new DynamicPartitionsSpec(3, null), - Collections.emptyMap(), - new UserCompactionTaskGranularityConfig(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} - ); - CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( - compactionConfig, - CompactionEngine.NATIVE - ); - Assert.assertFalse(validationResult.isValid()); - Assert.assertEquals( - "MSQ: Different name[sum_added] and fieldName(s)[[added]] for aggregator", - validationResult.getReason() - ); - } - @Test public void testMSQEngineWithRollupNullWithMetricsSpecIsValid() { diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index 78294fca0c4..87ddac50f01 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -684,4 +684,24 @@ public class DataSchemaTest extends InitializedNullHandlingTest Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap()); } + + @Test + public void testCombinedDataSchemaSetsHasRolledUpSegments() + { + CombinedDataSchema schema = new CombinedDataSchema( + IdUtilsTest.VALID_ID_CHARS, + new TimestampSpec("time", "auto", null), + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1")) + ) + .setDimensionExclusions(ImmutableList.of("dimC")) + .build(), + null, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, + true + ); + Assert.assertTrue(schema.hasRolledUpSegments()); + } }