Fix issues with MSQ Compaction (#17250)

The patch makes the following changes:
1. Fixes a bug causing compaction to fail on array, complex, and other non-primitive-type columns
2. Updates compaction status check to be conscious of partition dimensions when comparing dimension ordering.
3. Ensures only string columns are specified as partition dimensions
4. Ensures `rollup` is true if and only if metricsSpec is non-empty
5. Ensures disjoint intervals aren't submitted for compaction
6. Adds `compactionReason` to compaction task context.
This commit is contained in:
Vishesh Garg 2024-10-06 21:48:26 +05:30 committed by GitHub
parent 7d9e6d36fd
commit 7e35e50052
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 318 additions and 61 deletions

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -129,21 +130,35 @@ public class MSQCompactionRunner implements CompactionRunner
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
* Null is treated as true if metricsSpec exist and false if empty.</li>
* <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
* <li>Multiple disjoint intervals in compaction task</li>
* </ul>
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
if (intervalToDataSchemaMap.size() > 1) {
// We are currently not able to handle multiple intervals in the map for multiple reasons, one of them being that
// the subsequent worker ids clash -- since they are derived from MSQControllerTask ID which in turn is equal to
// CompactionTask ID for each sequentially launched MSQControllerTask.
return CompactionConfigValidationResult.failure(
"MSQ: Disjoint compaction intervals[%s] not supported",
intervalToDataSchemaMap.keySet()
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
if (compactionTask.getTuningConfig() != null) {
validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec())
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
)
);
}
if (compactionTask.getGranularitySpec() != null) {
@ -300,7 +315,7 @@ public class MSQCompactionRunner implements CompactionRunner
rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
}
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
rowSignatureBuilder.add(dimensionSchema.getName(), dimensionSchema.getColumnType());
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
@ -416,7 +431,9 @@ public class MSQCompactionRunner implements CompactionRunner
{
if (dataSchema.getGranularitySpec() != null) {
// If rollup is true without any metrics, all columns are treated as dimensions and
// duplicate rows are removed in line with native compaction.
// duplicate rows are removed in line with native compaction. This case can only happen if the rollup is
// specified as null in the compaction spec and is then inferred to be true by segment analysis. metrics=null and
// rollup=true combination in turn can only have been recorded for natively ingested segments.
return dataSchema.getGranularitySpec().isRollup();
}
// If no rollup specified, decide based on whether metrics are present.

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest
private static final int MAX_ROWS_PER_SEGMENT = 150000;
private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR;
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
private static List<String> PARTITION_DIMENSIONS;
private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false);
private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null);
@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest
LONG_DIMENSION,
MV_STRING_DIMENSION
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added");
private static final List<AggregatorFactory> AGGREGATORS = ImmutableList.of(AGG1, AGG2);
private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null);
private static final List<String> PARTITION_DIMENSIONS = Collections.singletonList(STRING_DIMENSION.getName());
@BeforeClass
public static void setupClass()
{
NullHandling.initializeForTests();
}
final StringDimensionSchema stringDimensionSchema = new StringDimensionSchema(
"string_dim",
@Test
public void testMultipleDisjointCompactionIntervalsAreInvalid()
{
Map<Interval, DataSchema> intervalDataschemas = new HashMap<>(INTERVAL_DATASCHEMAS);
intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null);
CompactionTask compactionTask = createCompactionTask(
new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")),
null,
Collections.emptyMap(),
null,
null
);
PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName());
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
intervalDataschemas
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
StringUtils.format("MSQ: Disjoint compaction intervals[%s] not supported", intervalDataschemas.keySet()),
validationResult.getReason()
);
}
@Test
@ -136,11 +162,11 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
public void testDimensionRangePartitionsSpecIsValid()
public void testStringDimensionInRangePartitionsSpecIsValid()
{
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false),
@ -149,7 +175,29 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
public void testLongDimensionInRangePartitionsSpecIsInvalid()
{
List<String> longPartitionDimension = Collections.singletonList(LONG_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, longPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-string partition dimension[long_dim] of type[long] not supported with 'range' partition spec",
validationResult.getReason()
);
}
@Test
@ -162,7 +210,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@ -175,7 +223,7 @@ public class MSQCompactionRunnerTest
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@ -188,7 +236,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@ -201,7 +249,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@ -214,7 +262,7 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskGranularitySpec(null, null, true),
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}
@Test
@ -227,13 +275,16 @@ public class MSQCompactionRunnerTest
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, null),
new ClientCompactionTaskGranularitySpec(null, null, true),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
"MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}

View File

@ -57,6 +57,9 @@ public interface CompactionRunner
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
);
}

View File

@ -470,7 +470,10 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(
this,
intervalDataSchemas
);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}

View File

@ -85,7 +85,8 @@ public class NativeCompactionRunner implements CompactionRunner
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalDataSchemaMap
)
{
return CompactionConfigValidationResult.success();

View File

@ -21,12 +21,14 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -36,6 +38,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -102,16 +107,20 @@ public class ClientCompactionRunnerInfo
* Checks if the provided compaction config is supported by MSQ. The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>rollup in granularitySpec set to false when metricsSpec is specified or true when it's empty.</li>
* <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
* </ul>
*/
private static CompactionConfigValidationResult compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
{
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
if (newConfig.getTuningConfig() != null) {
validationResults.add(validatePartitionsSpecForMSQ(newConfig.getTuningConfig().getPartitionsSpec()));
validationResults.add(validatePartitionsSpecForMSQ(
newConfig.getTuningConfig().getPartitionsSpec(),
newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions()
));
}
if (newConfig.getGranularitySpec() != null) {
validationResults.add(validateRollupForMSQ(
@ -128,9 +137,13 @@ public class ClientCompactionRunnerInfo
}
/**
* Validate that partitionSpec is either 'dynamic` or 'range', and if 'dynamic', ensure 'maxTotalRows' is null.
* Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', ensure 'maxTotalRows' is null. If range
* ensure all partition columns are of string type.
*/
public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(PartitionsSpec partitionsSpec)
public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
@Nullable PartitionsSpec partitionsSpec,
@Nullable List<DimensionSchema> dimensionSchemas
)
{
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
@ -146,11 +159,28 @@ public class ClientCompactionRunnerInfo
"MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
);
}
if (partitionsSpec instanceof DimensionRangePartitionsSpec && dimensionSchemas != null) {
Map<String, DimensionSchema> dimensionSchemaMap = dimensionSchemas.stream().collect(
Collectors.toMap(DimensionSchema::getName, Function.identity())
);
Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
.getPartitionDimensions()
.stream()
.filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
.findAny();
if (nonStringDimension.isPresent()) {
return CompactionConfigValidationResult.failure(
"MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec",
nonStringDimension.get(),
dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
);
}
}
return CompactionConfigValidationResult.success();
}
/**
* Validate rollup in granularitySpec is set to true when metricsSpec is specified and false if it's null.
* Validate rollup in granularitySpec is set to true iff metricsSpec is specified.
* If rollup set to null, all existing segments are analyzed, and it's set to true iff all segments have rollup
* set to true.
*/
@ -159,13 +189,9 @@ public class ClientCompactionRunnerInfo
@Nullable Boolean isRollup
)
{
if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && !isRollup) {
if ((metricsSpec != null && metricsSpec.length > 0) != Boolean.TRUE.equals(isRollup)) {
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified"
);
} else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != null && isRollup) {
return CompactionConfigValidationResult.failure(
"MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null"
"MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified"
);
}
return CompactionConfigValidationResult.success();
@ -190,7 +216,7 @@ public class ClientCompactionRunnerInfo
}
/**
* Validate each metric is idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
* Validate each metric defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
*/
public static CompactionConfigValidationResult validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
@ -202,7 +228,7 @@ public class ClientCompactionRunnerInfo
.findFirst()
.map(aggregatorFactory ->
CompactionConfigValidationResult.failure(
"MSQ: Non-idempotent aggregator[%s] not supported in 'metricsSpec'.",
"MSQ: Aggregator[%s] not supported in 'metricsSpec'",
aggregatorFactory.getName()
)
).orElse(CompactionConfigValidationResult.success());

View File

@ -25,7 +25,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@ -44,6 +44,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Represents the status of compaction for a given {@link CompactionCandidate}.
@ -230,6 +231,21 @@ public class CompactionStatus
}
}
private static List<DimensionSchema> getNonPartitioningDimensions(
@Nullable final List<DimensionSchema> dimensionSchemas,
@Nullable final PartitionsSpec partitionsSpec
)
{
if (dimensionSchemas == null || !(partitionsSpec instanceof DimensionRangePartitionsSpec)) {
return dimensionSchemas;
}
final List<String> partitionsDimensions = ((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions();
return dimensionSchemas.stream()
.filter(dim -> !partitionsDimensions.contains(dim.getName()))
.collect(Collectors.toList());
}
/**
* Converts to have only the effective maxRowsPerSegment to avoid false positives when targetRowsPerSegment is set but
* effectively translates to the same maxRowsPerSegment.
@ -389,18 +405,34 @@ public class CompactionStatus
}
}
/**
* Removes partition dimensions before comparison, since they are placed in front of the sort order --
* which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately
* covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
*/
private CompactionStatus dimensionsSpecIsUpToDate()
{
if (compactionConfig.getDimensionsSpec() == null) {
return COMPLETE;
} else {
final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
return CompactionStatus.completeIfEqual(
"dimensionsSpec",
compactionConfig.getDimensionsSpec().getDimensions(),
existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions(),
String::valueOf
List<DimensionSchema> existingDimensions = getNonPartitioningDimensions(
lastCompactionState.getDimensionsSpec() == null
? null
: lastCompactionState.getDimensionsSpec().getDimensions(),
lastCompactionState.getPartitionsSpec()
);
List<DimensionSchema> configuredDimensions = getNonPartitioningDimensions(
compactionConfig.getDimensionsSpec().getDimensions(),
compactionConfig.getTuningConfig() == null ? null : compactionConfig.getTuningConfig().getPartitionsSpec()
);
{
return CompactionStatus.completeIfEqual(
"dimensionsSpec",
configuredDimensions,
existingDimensions,
String::valueOf
);
}
}
}

View File

@ -86,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
* Must be the same as org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
private static final String COMPACTION_REASON_KEY = "compactionReason";
private static final Logger LOG = new Logger(CompactSegments.class);
@ -567,6 +568,10 @@ public class CompactSegments implements CoordinatorCustomDuty
slotsRequiredForCurrentTask = findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig());
}
if (entry.getCurrentStatus() != null) {
autoCompactionContext.put(COMPACTION_REASON_KEY, entry.getCurrentStatus().getReason());
}
final String taskId = compactSegments(
entry,
config.getTaskPriority(),

View File

@ -21,6 +21,8 @@ package org.apache.druid.client.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@ -36,6 +38,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
@ -45,6 +48,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class ClientCompactionRunnerInfoTest
@ -56,6 +60,7 @@ public class ClientCompactionRunnerInfoTest
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@ -76,6 +81,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@ -96,6 +102,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
@ -103,18 +110,40 @@ public class ClientCompactionRunnerInfoTest
}
@Test
public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
public void testMSQEngineWithStringDimensionsInRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
.isValid());
}
@Test
public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
null,
ImmutableList.of(new LongDimensionSchema("partitionDim"))
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-string partition dimension[partitionDim] of type[long] not supported with 'range' partition spec",
validationResult.getReason()
);
}
@Test
public void testMSQEngineWithQueryGranularityAllIsValid()
{
@ -122,6 +151,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL, Granularities.ALL, false),
null,
null
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
@ -135,7 +165,8 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@ -143,7 +174,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified",
"MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
validationResult.getReason()
);
}
@ -155,6 +186,7 @@ public class ClientCompactionRunnerInfoTest
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, true),
null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
@ -163,7 +195,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
"MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
validationResult.getReason()
);
}
@ -177,8 +209,9 @@ public class ClientCompactionRunnerInfoTest
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
new UserCompactionTaskGranularityConfig(null, null, true),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)},
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
@ -186,29 +219,38 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
"MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
@Test
public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified",
validationResult.getReason()
);
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE)
.isValid());
}
private static DataSourceCompactionConfig createMSQCompactionConfig(
PartitionsSpec partitionsSpec,
Map<String, Object> context,
@Nullable UserCompactionTaskGranularityConfig granularitySpec,
@Nullable AggregatorFactory[] metricsSpec
@Nullable AggregatorFactory[] metricsSpec,
List<DimensionSchema> dimensions
)
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@ -219,7 +261,7 @@ public class ClientCompactionRunnerInfoTest
new Period(3600),
createTuningConfig(partitionsSpec),
granularitySpec,
null,
new UserCompactionTaskDimensionsConfig(dimensions),
metricsSpec,
null,
null,

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@ -1137,6 +1138,82 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorDoesNotReturnsSegmentsWhenPartitionDimensionsPrefixed()
{
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Set range partitions spec with dimensions ["dim2", "dim4"] -- the same as what is set in the auto compaction config
PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
null,
Integer.MAX_VALUE,
ImmutableList.of("dim2", "dim4"),
false
);
// Create segments that were compacted (CompactionState != null) and have
// Dimensions=["dim2", "dim4", "dim3", "dim1"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
// Dimensions=["dim2", "dim4", "dim1", "dim3"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
final SegmentTimeline timeline = createTimeline(
createSegments()
.startingAt("2017-10-01")
.withNumPartitions(4)
.withCompactionState(
new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim3", "dim1"))), null, null, indexSpec, null)
),
createSegments()
.startingAt("2017-10-02")
.withNumPartitions(4)
.withCompactionState(
new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim1", "dim3"))), null, null, indexSpec, null)
)
);
// Auto compaction config sets Dimensions=["dim1", "dim2", "dim3", "dim4"] and partition dimensions as ["dim2", "dim4"]
CompactionSegmentIterator iterator = createIterator(
configBuilder().withDimensionsSpec(
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "dim4")))
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
partitionsSpec,
IndexSpec.DEFAULT,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build(),
timeline
);
// We should get only interval 2017-10-01T00:00:00/2017-10-02T00:00:00 since 2017-10-02T00:00:00/2017-10-03T00:00:00
// has dimension order as expected post reordering of partition dimensions.
Assert.assertTrue(iterator.hasNext());
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentFilter() throws Exception
{