diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 7b4d3235dc0..efc2cbb2afb 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -66,7 +66,6 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
@@ -120,15 +119,14 @@ public class MSQCompactionRunner implements CompactionRunner
*
* - partitionsSpec of type HashedParititionsSpec.
* - maxTotalRows in DynamicPartitionsSpec.
- * - rollup set to false in granularitySpec when metricsSpec is specified. Null is treated as true.
- * - queryGranularity set to ALL in granularitySpec.
- * - Each metric has output column name same as the input name.
+ * - rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
+ * Null is treated as true if metricsSpec exist and false if empty.
+ * - any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
*
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
+ CompactionTask compactionTask
)
{
List validationResults = new ArrayList<>();
@@ -144,57 +142,13 @@ public class MSQCompactionRunner implements CompactionRunner
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
- validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
+ validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(CompactionConfigValidationResult.success());
}
- /**
- * Valides that there are no rolled-up segments where either:
- *
- * - aggregator factory differs from its combining factory
- * - input col name is different from the output name (non-idempotent)
- *
- */
- private CompactionConfigValidationResult validateRolledUpSegments(Map intervalToDataSchemaMap)
- {
- for (Map.Entry intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
- if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
- CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
- if (combinedDataSchema.hasRolledUpSegments()) {
- for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
- // This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
- // compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
- if (
- !(
- aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
- (
- aggregatorFactory.requiredFields().isEmpty() ||
- (aggregatorFactory.requiredFields().size() == 1 &&
- aggregatorFactory.requiredFields()
- .get(0)
- .equals(aggregatorFactory.getName()))
- )
- )
- ) {
- // MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
- // the aggregated column name. This is because the aggregated values would then get overwritten by new
- // values and the existing values would be lost. Note that if no rollup is specified in an index spec,
- // the default value is true.
- return CompactionConfigValidationResult.failure(
- "MSQ: Rolled-up segments in compaction interval[%s].",
- intervalDataSchema.getKey()
- );
- }
- }
- }
- }
- }
- return CompactionConfigValidationResult.success();
- }
-
@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 6c5d1957265..d868ddf20e5 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.math.expr.ExprMacroTable;
@@ -60,7 +59,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
@@ -131,7 +129,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -144,7 +142,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -157,7 +155,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -170,7 +168,7 @@ public class MSQCompactionRunnerTest
null,
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -183,7 +181,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
- Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@@ -196,7 +194,41 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
- Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ }
+
+ @Test
+ public void testRollupTrueWithoutMetricsSpecIsInValid()
+ {
+ CompactionTask compactionTask = createCompactionTask(
+ new DynamicPartitionsSpec(3, null),
+ null,
+ Collections.emptyMap(),
+ new ClientCompactionTaskGranularitySpec(null, null, true),
+ null
+ );
+ Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+ }
+
+ @Test
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+ {
+ // Aggregators having different input and ouput column names are unsupported.
+ final String inputColName = "added";
+ final String outputColName = "sum_added";
+ CompactionTask compactionTask = createCompactionTask(
+ new DynamicPartitionsSpec(3, null),
+ null,
+ Collections.emptyMap(),
+ new ClientCompactionTaskGranularitySpec(null, null, null),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
+ );
+ CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
+ validationResult.getReason()
+ );
}
@Test
@@ -345,48 +377,6 @@ public class MSQCompactionRunnerTest
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
}
- @Test
- public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails()
- {
- final String inputColName = "added";
- final String outputColName = "sum_added";
- CompactionTask compactionTask = createCompactionTask(
- null,
- null,
- Collections.emptyMap(),
- null,
- new AggregatorFactory[]{
- new LongSumAggregatorFactory(
- outputColName,
- inputColName
- )
- }
- );
- CombinedDataSchema dataSchema = new CombinedDataSchema(
- DATA_SOURCE,
- new TimestampSpec(TIMESTAMP_COLUMN, null, null),
- new DimensionsSpec(DIMENSIONS),
- new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
- new UniformGranularitySpec(
- SEGMENT_GRANULARITY.getDefaultGranularity(),
- null,
- false,
- Collections.singletonList(COMPACTION_INTERVAL)
- ),
- null,
- true
- );
- CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
- compactionTask,
- Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
- );
- Assert.assertFalse(validationResult.isValid());
- Assert.assertEquals(validationResult.getReason(), StringUtils.format(
- "MSQ: Rolled-up segments in compaction interval[%s].",
- COMPACTION_INTERVAL
- ));
- }
-
private CompactionTask createCompactionTask(
@Nullable PartitionsSpec partitionsSpec,
@Nullable DimFilter dimFilter,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 0abaeed8eb2..8d30a60d04e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,9 +57,6 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
- CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
- );
+ CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 68320387845..8659eb0f397 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -77,7 +77,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -460,13 +459,11 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
transformSpec,
metricsSpec,
granularitySpec,
- getMetricBuilder(),
- compactionRunner
+ getMetricBuilder()
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
- CompactionConfigValidationResult supportsCompactionConfig =
- compactionRunner.validateCompactionTask(this, intervalDataSchemas);
+ CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
@@ -488,8 +485,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
- final ServiceMetricEvent.Builder metricBuilder,
- CompactionRunner compactionRunner
+ final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final Iterable timelineSegments = retrieveRelevantTimelineHolders(
@@ -553,8 +549,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
- : granularitySpec.withSegmentGranularity(segmentGranularityToUse),
- compactionRunner
+ : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
intervalDataSchemaMap.put(interval, dataSchema);
}
@@ -579,8 +574,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
dimensionsSpec,
transformSpec,
metricsSpec,
- granularitySpec,
- compactionRunner
+ granularitySpec
);
return Collections.singletonMap(segmentProvider.interval, dataSchema);
}
@@ -610,17 +604,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
- @Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
- @Nullable CompactionRunner compactionRunner
+ @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
// Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer(
segments,
- // For MSQ, always need rollup to check if there are some rollup segments already present.
- compactionRunner instanceof NativeCompactionRunner
- ? (granularitySpec.isRollup() == null)
- : true,
+ granularitySpec.isRollup() == null,
granularitySpec.getQueryGranularity() == null,
dimensionsSpec == null,
metricsSpec == null
@@ -675,14 +665,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
finalMetricsSpec = metricsSpec;
}
- return new CombinedDataSchema(
+ return new DataSchema(
dataSource,
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
- transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null),
- existingSegmentAnalyzer.hasRolledUpSegments()
+ transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null)
);
}
@@ -759,7 +748,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// For processRollup:
private boolean rollup = true;
- private boolean hasRolledUpSegments = false;
// For processQueryGranularity:
private Granularity queryGranularity;
@@ -827,11 +815,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
return rollup;
}
- public boolean hasRolledUpSegments()
- {
- return hasRolledUpSegments;
- }
-
public Granularity getQueryGranularity()
{
if (!needQueryGranularity) {
@@ -921,7 +904,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
- hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup);
}
private void processQueryGranularity(final QueryableIndex index)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 5aa7af71451..2074d14f0f9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,8 +85,7 @@ public class NativeCompactionRunner implements CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
- CompactionTask compactionTask,
- Map intervalToDataSchemaMap
+ CompactionTask compactionTask
)
{
return CompactionConfigValidationResult.success();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 3a386bc4aa7..f9849b1483d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -749,8 +749,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -811,8 +810,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -874,8 +872,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -938,8 +935,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1009,8 +1005,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1060,8 +1055,7 @@ public class CompactionTaskTest
null,
customMetricsSpec,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1104,8 +1098,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1155,8 +1148,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@@ -1186,8 +1178,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@@ -1228,8 +1219,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1273,8 +1263,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
dataSchemasForIntervals,
@@ -1319,8 +1308,7 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
null
),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1367,8 +1355,7 @@ public class CompactionTaskTest
null,
null,
null,
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1413,8 +1400,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1459,8 +1445,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@@ -1490,8 +1475,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
- METRIC_BUILDER,
- null
+ METRIC_BUILDER
);
final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index d09bced3313..8f070f33405 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -80,7 +80,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -184,7 +183,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93.0
@@ -286,7 +286,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93
@@ -328,8 +329,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
- @Test(dataProvider = "engine")
- public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception
+ @Test()
+ public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
{
// added = 31, count = null, sum_added = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
@@ -357,7 +358,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
false,
- engine
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 2, sum_added = 62
@@ -480,7 +481,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
- false
+ false,
+ CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 4, sum_added = 124
@@ -521,7 +523,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), CompactionEngine.NATIVE);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -539,7 +541,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
0,
1,
1);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -651,7 +653,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -863,7 +865,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ true,
+ engine
+ );
List expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
@@ -881,7 +889,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
newGranularity = Granularities.MONTH;
// Set dropExisting to true
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ true,
+ engine
+ );
// Since dropExisting is set to true...
// Again data is only in two days
@@ -950,7 +964,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -967,7 +987,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ 1000,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
LOG.info("Auto compaction test with DAY segment granularity");
@@ -1169,7 +1195,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
List expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@@ -1195,7 +1227,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+ false,
+ CompactionEngine.NATIVE
+ );
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@@ -1239,7 +1277,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null),
- false
+ false,
+ CompactionEngine.NATIVE
);
// Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
// We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
@@ -1319,8 +1358,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
- @Test(dataProvider = "engine")
- public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception
+ @Test()
+ public void testAutoCompactionDutyWithRollup() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@@ -1337,7 +1376,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
false,
- engine
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
queryAndResultFields = ImmutableMap.of(
@@ -1470,7 +1509,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new UserCompactionTaskTransformConfig(new SelectorDimFilter("page", "Striker Eureka", null)),
null,
- false
+ false,
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@@ -1517,7 +1557,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
new AggregatorFactory[] {new DoubleSumAggregatorFactory("double_sum_added", "added"), new LongSumAggregatorFactory("long_sum_added", "added")},
- false
+ false,
+ CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@@ -1577,7 +1618,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
null,
- false
+ false,
+ CompactionEngine.NATIVE
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
@@ -1679,57 +1721,31 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
queryHelper.testQueriesFromString(queryResponseTemplate);
}
- private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest)
- throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, null);
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, engine);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec
- ) throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, null);
- }
-
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, engine);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec,
- boolean dropExisting
- ) throws Exception
- {
- submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, dropExisting, null);
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@@ -1744,28 +1760,6 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
);
}
- private void submitCompactionConfig(
- Integer maxRowsPerSegment,
- Period skipOffsetFromLatest,
- UserCompactionTaskGranularityConfig granularitySpec,
- UserCompactionTaskDimensionsConfig dimensionsSpec,
- UserCompactionTaskTransformConfig transformSpec,
- AggregatorFactory[] metricsSpec,
- boolean dropExisting
- ) throws Exception
- {
- submitCompactionConfig(
- maxRowsPerSegment,
- skipOffsetFromLatest,
- granularitySpec,
- dimensionsSpec,
- transformSpec,
- metricsSpec,
- dropExisting,
- null
- );
- }
-
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
@@ -1774,7 +1768,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@@ -1799,7 +1793,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
- @Nullable CompactionEngine engine
+ CompactionEngine engine
) throws Exception
{
DataSourceCompactionConfig dataSourceCompactionConfig = new DataSourceCompactionConfig(
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 0f92d99db10..806b35e9481 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -32,6 +32,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -102,9 +103,8 @@ public class ClientCompactionRunnerInfo
*
* - partitionsSpec of type HashedParititionsSpec.
* - maxTotalRows in DynamicPartitionsSpec.
- * - rollup set to false in granularitySpec when metricsSpec is specified. Null is treated as true.
- * - queryGranularity set to ALL in granularitySpec.
- * - Each metric has output column name same as the input name.
+ * - rollup in granularitySpec set to false when metricsSpec is specified or true when it's empty.
+ * - any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
*
*/
private static CompactionConfigValidationResult compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
@@ -120,6 +120,7 @@ public class ClientCompactionRunnerInfo
));
}
validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext()));
+ validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
@@ -149,17 +150,23 @@ public class ClientCompactionRunnerInfo
}
/**
- * Validate rollup is set to false in granularitySpec when metricsSpec is specified.
+ * Validate rollup in granularitySpec is set to true when metricsSpec is specified and false if it's null.
+ * If rollup set to null, all existing segments are analyzed, and it's set to true iff all segments have rollup
+ * set to true.
*/
public static CompactionConfigValidationResult validateRollupForMSQ(
AggregatorFactory[] metricsSpec,
@Nullable Boolean isRollup
)
{
- if (metricsSpec != null && isRollup != null && !isRollup) {
+ if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && !isRollup) {
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified"
);
+ } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != null && isRollup) {
+ return CompactionConfigValidationResult.failure(
+ "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null"
+ );
}
return CompactionConfigValidationResult.success();
}
@@ -181,4 +188,23 @@ public class ClientCompactionRunnerInfo
}
return CompactionConfigValidationResult.success();
}
+
+ /**
+ * Validate each metric is idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
+ */
+ public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
+ {
+ if (metricsSpec == null) {
+ return CompactionConfigValidationResult.success();
+ }
+ return Arrays.stream(metricsSpec)
+ .filter(aggregatorFactory -> !aggregatorFactory.equals(aggregatorFactory.getCombiningFactory()))
+ .findFirst()
+ .map(aggregatorFactory ->
+ CompactionConfigValidationResult.failure(
+ "MSQ: Non-idempotent aggregator[%s] not supported in 'metricsSpec'.",
+ aggregatorFactory.getName()
+ )
+ ).orElse(CompactionConfigValidationResult.success());
+ }
}
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java
deleted file mode 100644
index b2cb90bc0ce..00000000000
--- a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.indexing;
-
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.segment.transform.TransformSpec;
-
-import javax.annotation.Nullable;
-
-/**
- * Class representing the combined DataSchema of a set of segments, currently used only by Compaction.
- */
-public class CombinedDataSchema extends DataSchema
-{
- private final boolean hasRolledUpSegments;
-
- public CombinedDataSchema(
- String dataSource,
- @Nullable TimestampSpec timestampSpec,
- @Nullable DimensionsSpec dimensionsSpec,
- AggregatorFactory[] aggregators,
- GranularitySpec granularitySpec,
- TransformSpec transformSpec,
- @Nullable boolean hasRolledUpSegments
- )
- {
- super(
- dataSource,
- timestampSpec,
- dimensionsSpec,
- aggregators,
- granularitySpec,
- transformSpec,
- null,
- null
- );
- this.hasRolledUpSegments = hasRolledUpSegments;
- }
-
- public boolean hasRolledUpSegments()
- {
- return hasRolledUpSegments;
- }
-}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index 7742eaaf138..011a4640da3 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -52,7 +52,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
@@ -72,7 +72,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithMaxTotalRowsIsInvalid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
@@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
@@ -105,7 +105,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
@@ -118,7 +118,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithQueryGranularityAllIsValid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL, Granularities.ALL, false),
@@ -131,7 +131,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
@@ -148,10 +148,53 @@ public class ClientCompactionRunnerInfoTest
);
}
+ @Test
+ public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid()
+ {
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+ new DynamicPartitionsSpec(3, null),
+ Collections.emptyMap(),
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ null
+ );
+ CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
+ validationResult.getReason()
+ );
+ }
+
+ @Test
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
+ {
+ // Aggregators having combiningFactory different from the aggregatorFactory are unsupported.
+ final String inputColName = "added";
+ final String outputColName = "sum_added";
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+ new DynamicPartitionsSpec(3, null),
+ Collections.emptyMap(),
+ new UserCompactionTaskGranularityConfig(null, null, null),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
+ );
+ CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
+ validationResult.getReason()
+ );
+ }
+
@Test
public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
{
- DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
@@ -161,7 +204,7 @@ public class ClientCompactionRunnerInfoTest
.isValid());
}
- private static DataSourceCompactionConfig createCompactionConfig(
+ private static DataSourceCompactionConfig createMSQCompactionConfig(
PartitionsSpec partitionsSpec,
Map context,
@Nullable UserCompactionTaskGranularityConfig granularitySpec,
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 87ddac50f01..78294fca0c4 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -684,24 +684,4 @@ public class DataSchemaTest extends InitializedNullHandlingTest
Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
}
-
- @Test
- public void testCombinedDataSchemaSetsHasRolledUpSegments()
- {
- CombinedDataSchema schema = new CombinedDataSchema(
- IdUtilsTest.VALID_ID_CHARS,
- new TimestampSpec("time", "auto", null),
- DimensionsSpec.builder()
- .setDimensions(
- DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1"))
- )
- .setDimensionExclusions(ImmutableList.of("dimC"))
- .build(),
- null,
- new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
- null,
- true
- );
- Assert.assertTrue(schema.hasRolledUpSegments());
- }
}
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index c44b9b2f358..24cbfbd9462 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -202,7 +202,7 @@ public class CoordinatorCompactionConfigsResourceTest
.withInputSegmentSizeBytes(1000L)
.withSkipOffsetFromLatest(Period.hours(3))
.withGranularitySpec(
- new UserCompactionTaskGranularityConfig(Granularities.DAY, null, true)
+ new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)
)
.withEngine(CompactionEngine.MSQ)
.build();