Do not support non-idempotent aggregator in MSQ compaction (#16846)

This PR adds checks for verification of DataSourceCompactionConfig and CompactionTask with msq engine to ensure:

each aggregator in metricsSpec is idempotent
metricsSpec is non-null when rollup is set to true
Unit tests and existing compaction ITs have been updated accordingly.
This commit is contained in:
Vishesh Garg 2024-08-06 20:58:08 +05:30 committed by GitHub
parent aa49be61ea
commit 593c3b2150
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 225 additions and 340 deletions

View File

@ -66,7 +66,6 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
@ -120,15 +119,14 @@ public class MSQCompactionRunner implements CompactionRunner
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>rollup set to false in granularitySpec when metricsSpec is specified. Null is treated as true.</li>
* <li>queryGranularity set to ALL in granularitySpec.</li>
* <li>Each metric has output column name same as the input name.</li>
* <li>rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
* Null is treated as true if metricsSpec exist and false if empty.</li>
* <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.</li>
* </ul>
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
CompactionTask compactionTask
)
{
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
@ -144,57 +142,13 @@ public class MSQCompactionRunner implements CompactionRunner
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(CompactionConfigValidationResult.success());
}
/**
* Valides that there are no rolled-up segments where either:
* <ul>
* <li>aggregator factory differs from its combining factory </li>
* <li>input col name is different from the output name (non-idempotent)</li>
* </ul>
*/
private CompactionConfigValidationResult validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
{
for (Map.Entry<Interval, DataSchema> intervalDataSchema : intervalToDataSchemaMap.entrySet()) {
if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
CombinedDataSchema combinedDataSchema = (CombinedDataSchema) intervalDataSchema.getValue();
if (combinedDataSchema.hasRolledUpSegments()) {
for (AggregatorFactory aggregatorFactory : combinedDataSchema.getAggregators()) {
// This is a conservative check as existing rollup may have been idempotent but the aggregator provided in
// compaction spec isn't. This would get properly compacted yet fails in the below pre-check.
if (
!(
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass()) &&
(
aggregatorFactory.requiredFields().isEmpty() ||
(aggregatorFactory.requiredFields().size() == 1 &&
aggregatorFactory.requiredFields()
.get(0)
.equals(aggregatorFactory.getName()))
)
)
) {
// MSQ doesn't support rolling up already rolled-up segments when aggregate column name is different from
// the aggregated column name. This is because the aggregated values would then get overwritten by new
// values and the existing values would be lost. Note that if no rollup is specified in an index spec,
// the default value is true.
return CompactionConfigValidationResult.failure(
"MSQ: Rolled-up segments in compaction interval[%s].",
intervalDataSchema.getKey()
);
}
}
}
}
}
return CompactionConfigValidationResult.success();
}
@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{

View File

@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.math.expr.ExprMacroTable;
@ -60,7 +59,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
@ -131,7 +129,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@ -144,7 +142,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@ -157,7 +155,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@ -170,7 +168,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@ -183,7 +181,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
@ -196,7 +194,41 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, Collections.emptyMap()).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
public void testRollupTrueWithoutMetricsSpecIsInValid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
}
@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
validationResult.getReason()
);
}
@Test
@ -345,48 +377,6 @@ public class MSQCompactionRunnerTest
Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy());
}
@Test
public void testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails()
{
final String inputColName = "added";
final String outputColName = "sum_added";
CompactionTask compactionTask = createCompactionTask(
null,
null,
Collections.emptyMap(),
null,
new AggregatorFactory[]{
new LongSumAggregatorFactory(
outputColName,
inputColName
)
}
);
CombinedDataSchema dataSchema = new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
new UniformGranularitySpec(
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
false,
Collections.singletonList(COMPACTION_INTERVAL)
),
null,
true
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(validationResult.getReason(), StringUtils.format(
"MSQ: Rolled-up segments in compaction interval[%s].",
COMPACTION_INTERVAL
));
}
private CompactionTask createCompactionTask(
@Nullable PartitionsSpec partitionsSpec,
@Nullable DimFilter dimFilter,

View File

@ -57,9 +57,6 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
);
CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
}

View File

@ -77,7 +77,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@ -460,13 +459,11 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
transformSpec,
metricsSpec,
granularitySpec,
getMetricBuilder(),
compactionRunner
getMetricBuilder()
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
CompactionConfigValidationResult supportsCompactionConfig =
compactionRunner.validateCompactionTask(this, intervalDataSchemas);
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
@ -488,8 +485,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final ServiceMetricEvent.Builder metricBuilder,
CompactionRunner compactionRunner
final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final Iterable<DataSegment> timelineSegments = retrieveRelevantTimelineHolders(
@ -553,8 +549,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse),
compactionRunner
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
intervalDataSchemaMap.put(interval, dataSchema);
}
@ -579,8 +574,7 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
compactionRunner
granularitySpec
);
return Collections.singletonMap(segmentProvider.interval, dataSchema);
}
@ -610,17 +604,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable CompactionRunner compactionRunner
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
// Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer(
segments,
// For MSQ, always need rollup to check if there are some rollup segments already present.
compactionRunner instanceof NativeCompactionRunner
? (granularitySpec.isRollup() == null)
: true,
granularitySpec.isRollup() == null,
granularitySpec.getQueryGranularity() == null,
dimensionsSpec == null,
metricsSpec == null
@ -675,14 +665,13 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
finalMetricsSpec = metricsSpec;
}
return new CombinedDataSchema(
return new DataSchema(
dataSource,
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null),
existingSegmentAnalyzer.hasRolledUpSegments()
transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null)
);
}
@ -759,7 +748,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// For processRollup:
private boolean rollup = true;
private boolean hasRolledUpSegments = false;
// For processQueryGranularity:
private Granularity queryGranularity;
@ -827,11 +815,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
return rollup;
}
public boolean hasRolledUpSegments()
{
return hasRolledUpSegments;
}
public Granularity getQueryGranularity()
{
if (!needQueryGranularity) {
@ -921,7 +904,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
hasRolledUpSegments = hasRolledUpSegments || Boolean.valueOf(true).equals(isIndexRollup);
}
private void processQueryGranularity(final QueryableIndex index)

View File

@ -85,8 +85,7 @@ public class NativeCompactionRunner implements CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
CompactionTask compactionTask
)
{
return CompactionConfigValidationResult.success();

View File

@ -749,8 +749,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -811,8 +810,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -874,8 +872,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -938,8 +935,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1009,8 +1005,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1060,8 +1055,7 @@ public class CompactionTaskTest
null,
customMetricsSpec,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1104,8 +1098,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1155,8 +1148,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@ -1186,8 +1178,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
NativeCompactionRunner.createIngestionSpecs(
@ -1228,8 +1219,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1273,8 +1263,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
dataSchemasForIntervals,
@ -1319,8 +1308,7 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
null
),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1367,8 +1355,7 @@ public class CompactionTaskTest
null,
null,
null,
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1413,8 +1400,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1459,8 +1445,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
@ -1490,8 +1475,7 @@ public class CompactionTaskTest
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
METRIC_BUILDER,
null
METRIC_BUILDER
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(

View File

@ -80,7 +80,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@ -184,7 +183,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
false
false,
CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93.0
@ -286,7 +286,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
false
false,
CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93
@ -328,8 +329,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test(dataProvider = "engine")
public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine engine) throws Exception
@Test()
public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
{
// added = 31, count = null, sum_added = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
@ -357,7 +358,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
false,
engine
CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 2, sum_added = 62
@ -480,7 +481,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
false
false,
CompactionEngine.NATIVE
);
// should now only have 1 row after compaction
// added = null, count = 4, sum_added = 124
@ -521,7 +523,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), CompactionEngine.NATIVE);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
@ -539,7 +541,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
0,
1,
1);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
@ -651,7 +653,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, CompactionEngine.NATIVE);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@ -863,7 +865,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
true,
engine
);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
@ -881,7 +889,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
newGranularity = Granularities.MONTH;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
true,
engine
);
// Since dropExisting is set to true...
// Again data is only in two days
@ -950,7 +964,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
false,
CompactionEngine.NATIVE
);
LOG.info("Auto compaction test with YEAR segment granularity");
@ -967,7 +987,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
submitCompactionConfig(
1000,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
false,
CompactionEngine.NATIVE
);
LOG.info("Auto compaction test with DAY segment granularity");
@ -1169,7 +1195,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
false,
CompactionEngine.NATIVE
);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@ -1195,7 +1227,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null),
false,
CompactionEngine.NATIVE
);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@ -1239,7 +1277,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null),
false
false,
CompactionEngine.NATIVE
);
// Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
// We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
@ -1319,8 +1358,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test(dataProvider = "engine")
public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws Exception
@Test()
public void testAutoCompactionDutyWithRollup() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@ -1337,7 +1376,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
false,
engine
CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
queryAndResultFields = ImmutableMap.of(
@ -1470,7 +1509,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
new UserCompactionTaskTransformConfig(new SelectorDimFilter("page", "Striker Eureka", null)),
null,
false
false,
CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@ -1517,7 +1557,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
new AggregatorFactory[] {new DoubleSumAggregatorFactory("double_sum_added", "added"), new LongSumAggregatorFactory("long_sum_added", "added")},
false
false,
CompactionEngine.NATIVE
);
forceTriggerAutoCompaction(2);
@ -1577,7 +1618,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
null,
false
false,
CompactionEngine.NATIVE
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
@ -1679,57 +1721,31 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
queryHelper.testQueriesFromString(queryResponseTemplate);
}
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest)
throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, null);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
@Nullable CompactionEngine engine
CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, engine);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, null);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
@Nullable CompactionEngine engine
CompactionEngine engine
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false, engine);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting
) throws Exception
{
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, dropExisting, null);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting,
@Nullable CompactionEngine engine
CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@ -1744,28 +1760,6 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
UserCompactionTaskGranularityConfig granularitySpec,
UserCompactionTaskDimensionsConfig dimensionsSpec,
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting
) throws Exception
{
submitCompactionConfig(
maxRowsPerSegment,
skipOffsetFromLatest,
granularitySpec,
dimensionsSpec,
transformSpec,
metricsSpec,
dropExisting,
null
);
}
private void submitCompactionConfig(
Integer maxRowsPerSegment,
Period skipOffsetFromLatest,
@ -1774,7 +1768,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
@Nullable CompactionEngine engine
CompactionEngine engine
) throws Exception
{
submitCompactionConfig(
@ -1799,7 +1793,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
UserCompactionTaskTransformConfig transformSpec,
AggregatorFactory[] metricsSpec,
boolean dropExisting,
@Nullable CompactionEngine engine
CompactionEngine engine
) throws Exception
{
DataSourceCompactionConfig dataSourceCompactionConfig = new DataSourceCompactionConfig(

View File

@ -32,6 +32,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -102,9 +103,8 @@ public class ClientCompactionRunnerInfo
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>rollup set to false in granularitySpec when metricsSpec is specified. Null is treated as true.</li>
* <li>queryGranularity set to ALL in granularitySpec.</li>
* <li>Each metric has output column name same as the input name.</li>
* <li>rollup in granularitySpec set to false when metricsSpec is specified or true when it's empty.</li>
* <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.</li>
* </ul>
*/
private static CompactionConfigValidationResult compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
@ -120,6 +120,7 @@ public class ClientCompactionRunnerInfo
));
}
validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext()));
validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
@ -149,17 +150,23 @@ public class ClientCompactionRunnerInfo
}
/**
* Validate rollup is set to false in granularitySpec when metricsSpec is specified.
* Validate rollup in granularitySpec is set to true when metricsSpec is specified and false if it's null.
* If rollup set to null, all existing segments are analyzed, and it's set to true iff all segments have rollup
* set to true.
*/
public static CompactionConfigValidationResult validateRollupForMSQ(
AggregatorFactory[] metricsSpec,
@Nullable Boolean isRollup
)
{
if (metricsSpec != null && isRollup != null && !isRollup) {
if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && !isRollup) {
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified"
);
} else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != null && isRollup) {
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null"
);
}
return CompactionConfigValidationResult.success();
}
@ -181,4 +188,23 @@ public class ClientCompactionRunnerInfo
}
return CompactionConfigValidationResult.success();
}
/**
* Validate each metric is idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
*/
public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
if (metricsSpec == null) {
return CompactionConfigValidationResult.success();
}
return Arrays.stream(metricsSpec)
.filter(aggregatorFactory -> !aggregatorFactory.equals(aggregatorFactory.getCombiningFactory()))
.findFirst()
.map(aggregatorFactory ->
CompactionConfigValidationResult.failure(
"MSQ: Non-idempotent aggregator[%s] not supported in 'metricsSpec'.",
aggregatorFactory.getName()
)
).orElse(CompactionConfigValidationResult.success());
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import javax.annotation.Nullable;
/**
* Class representing the combined DataSchema of a set of segments, currently used only by Compaction.
*/
public class CombinedDataSchema extends DataSchema
{
private final boolean hasRolledUpSegments;
public CombinedDataSchema(
String dataSource,
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
AggregatorFactory[] aggregators,
GranularitySpec granularitySpec,
TransformSpec transformSpec,
@Nullable boolean hasRolledUpSegments
)
{
super(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
null,
null
);
this.hasRolledUpSegments = hasRolledUpSegments;
}
public boolean hasRolledUpSegments()
{
return hasRolledUpSegments;
}
}

View File

@ -52,7 +52,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
@ -72,7 +72,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithMaxTotalRowsIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
@ -105,7 +105,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
@ -118,7 +118,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithQueryGranularityAllIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL, Granularities.ALL, false),
@ -131,7 +131,7 @@ public class ClientCompactionRunnerInfoTest
@Test
public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
@ -148,10 +148,53 @@ public class ClientCompactionRunnerInfoTest
);
}
@Test
public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, true),
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
validationResult.getReason()
);
}
@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having combiningFactory different from the aggregatorFactory are unsupported.
final String inputColName = "added";
final String outputColName = "sum_added";
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
validationResult.getReason()
);
}
@Test
public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
@ -161,7 +204,7 @@ public class ClientCompactionRunnerInfoTest
.isValid());
}
private static DataSourceCompactionConfig createCompactionConfig(
private static DataSourceCompactionConfig createMSQCompactionConfig(
PartitionsSpec partitionsSpec,
Map<String, Object> context,
@Nullable UserCompactionTaskGranularityConfig granularitySpec,

View File

@ -684,24 +684,4 @@ public class DataSchemaTest extends InitializedNullHandlingTest
Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
}
@Test
public void testCombinedDataSchemaSetsHasRolledUpSegments()
{
CombinedDataSchema schema = new CombinedDataSchema(
IdUtilsTest.VALID_ID_CHARS,
new TimestampSpec("time", "auto", null),
DimensionsSpec.builder()
.setDimensions(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1"))
)
.setDimensionExclusions(ImmutableList.of("dimC"))
.build(),
null,
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
true
);
Assert.assertTrue(schema.hasRolledUpSegments());
}
}

View File

@ -202,7 +202,7 @@ public class CoordinatorCompactionConfigsResourceTest
.withInputSegmentSizeBytes(1000L)
.withSkipOffsetFromLatest(Period.hours(3))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, true)
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)
)
.withEngine(CompactionEngine.MSQ)
.build();