mirror of https://github.com/apache/druid.git
Compute `range` partitionsSpec using effective `maxRowsPerSegment` (#16987)
In the compaction config, a range type partitionsSpec supports setting one of maxRowsPerSegment and targetRowsPerSegment. When compaction is run with the native engine, while maxRowsPerSegment = x results in segments of size x, targetRowsPerSegment = y results in segments of size 1.5 * y. MSQ only supports rowsPerSegment = x as part of its tuning config, the resulting segment size being approx. x -- which is in line with maxRowsPerSegment behaviour in native compaction. This PR makes the following changes: use effective maxRowsPerSegment to pass as rowsPerSegment parameter for MSQ persist rowsPerSegment as maxRowsPerSegment in lastCompactionState for MSQ Use effective maxRowsPerSegment-based range spec in CompactionStatus check for both Native and MSQ.
This commit is contained in:
parent
b7a21a9f67
commit
37d4174245
|
@ -1603,9 +1603,10 @@ public class ControllerImpl implements Controller
|
|||
if (shardSpec != null) {
|
||||
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
|
||||
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
|
||||
// Effective maxRowsPerSegment is propagated as rowsPerSegment in MSQ
|
||||
partitionSpec = new DimensionRangePartitionsSpec(
|
||||
tuningConfig.getRowsPerSegment(),
|
||||
null,
|
||||
tuningConfig.getRowsPerSegment(),
|
||||
partitionDimensions,
|
||||
false
|
||||
);
|
||||
|
@ -1623,9 +1624,10 @@ public class ControllerImpl implements Controller
|
|||
)));
|
||||
}
|
||||
} else if (clusterBy != null && !clusterBy.getColumns().isEmpty()) {
|
||||
// Effective maxRowsPerSegment is propagated as rowsPerSegment in MSQ
|
||||
partitionSpec = new DimensionRangePartitionsSpec(
|
||||
tuningConfig.getRowsPerSegment(),
|
||||
null,
|
||||
tuningConfig.getRowsPerSegment(),
|
||||
clusterBy.getColumns()
|
||||
.stream()
|
||||
.map(KeyColumn::columnName).collect(Collectors.toList()),
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
|
|||
import org.apache.druid.data.input.impl.DimensionSchema;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
|
@ -286,19 +285,10 @@ public class MSQCompactionRunner implements CompactionRunner
|
|||
|
||||
private static Integer getRowsPerSegment(CompactionTask compactionTask)
|
||||
{
|
||||
Integer rowsPerSegment = PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT;
|
||||
if (compactionTask.getTuningConfig() != null) {
|
||||
PartitionsSpec partitionsSpec = compactionTask.getTuningConfig().getPartitionsSpec();
|
||||
if (partitionsSpec instanceof DynamicPartitionsSpec) {
|
||||
rowsPerSegment = partitionsSpec.getMaxRowsPerSegment();
|
||||
} else if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
|
||||
DimensionRangePartitionsSpec dimensionRangePartitionsSpec = (DimensionRangePartitionsSpec) partitionsSpec;
|
||||
rowsPerSegment = dimensionRangePartitionsSpec.getTargetRowsPerSegment() != null
|
||||
? dimensionRangePartitionsSpec.getTargetRowsPerSegment()
|
||||
: dimensionRangePartitionsSpec.getMaxRowsPerSegment();
|
||||
}
|
||||
if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) {
|
||||
return compactionTask.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment();
|
||||
}
|
||||
return rowsPerSegment;
|
||||
return PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT;
|
||||
}
|
||||
|
||||
private static RowSignature getRowSignature(DataSchema dataSchema)
|
||||
|
|
|
@ -2643,8 +2643,8 @@ public class MSQReplaceTest extends MSQTestBase
|
|||
);
|
||||
} else {
|
||||
partitionsSpec = new DimensionRangePartitionsSpec(
|
||||
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
|
||||
null,
|
||||
MultiStageQueryContext.getRowsPerSegment(QueryContext.of(context)),
|
||||
partitionDimensions,
|
||||
false
|
||||
);
|
||||
|
|
|
@ -93,6 +93,7 @@ public class MSQCompactionRunnerTest
|
|||
|
||||
private static final String TIMESTAMP_COLUMN = ColumnHolder.TIME_COLUMN_NAME;
|
||||
private static final int TARGET_ROWS_PER_SEGMENT = 100000;
|
||||
private static final int MAX_ROWS_PER_SEGMENT = 150000;
|
||||
private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR;
|
||||
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
|
||||
private static List<String> PARTITION_DIMENSIONS;
|
||||
|
@ -286,7 +287,7 @@ public class MSQCompactionRunnerTest
|
|||
new MSQTuningConfig(
|
||||
1,
|
||||
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
|
||||
TARGET_ROWS_PER_SEGMENT,
|
||||
MAX_ROWS_PER_SEGMENT,
|
||||
null,
|
||||
createIndexSpec()
|
||||
),
|
||||
|
@ -326,7 +327,7 @@ public class MSQCompactionRunnerTest
|
|||
DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);
|
||||
|
||||
CompactionTask taskCreatedWithTransformSpec = createCompactionTask(
|
||||
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false),
|
||||
new DimensionRangePartitionsSpec(null, MAX_ROWS_PER_SEGMENT, PARTITION_DIMENSIONS, false),
|
||||
dimFilter,
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
|
@ -364,7 +365,7 @@ public class MSQCompactionRunnerTest
|
|||
new MSQTuningConfig(
|
||||
1,
|
||||
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
|
||||
TARGET_ROWS_PER_SEGMENT,
|
||||
MAX_ROWS_PER_SEGMENT,
|
||||
null,
|
||||
createIndexSpec()
|
||||
),
|
||||
|
|
|
@ -692,16 +692,26 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
|
||||
LOG.info("Auto compaction test with range partitioning");
|
||||
|
||||
final DimensionRangePartitionsSpec rangePartitionsSpec = new DimensionRangePartitionsSpec(
|
||||
final DimensionRangePartitionsSpec inputRangePartitionsSpec = new DimensionRangePartitionsSpec(
|
||||
5,
|
||||
null,
|
||||
ImmutableList.of("city"),
|
||||
false
|
||||
);
|
||||
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
|
||||
DimensionRangePartitionsSpec expectedRangePartitionsSpec = inputRangePartitionsSpec;
|
||||
if (engine == CompactionEngine.MSQ) {
|
||||
// Range spec is transformed to its effective maxRowsPerSegment equivalent in MSQ
|
||||
expectedRangePartitionsSpec = new DimensionRangePartitionsSpec(
|
||||
null,
|
||||
7,
|
||||
ImmutableList.of("city"),
|
||||
false
|
||||
);
|
||||
}
|
||||
submitCompactionConfig(inputRangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine);
|
||||
forceTriggerAutoCompaction(2);
|
||||
verifyQuery(INDEX_QUERIES_RESOURCE);
|
||||
verifySegmentsCompacted(rangePartitionsSpec, 2);
|
||||
verifySegmentsCompacted(expectedRangePartitionsSpec, 2);
|
||||
checkCompactionIntervals(intervalsBeforeCompaction);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -223,11 +223,27 @@ public class CompactionStatus
|
|||
partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
|
||||
((DynamicPartitionsSpec) partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
|
||||
);
|
||||
} else if (partitionsSpecFromTuningConfig instanceof DimensionRangePartitionsSpec) {
|
||||
return getEffectiveRangePartitionsSpec((DimensionRangePartitionsSpec) partitionsSpecFromTuningConfig);
|
||||
} else {
|
||||
return partitionsSpecFromTuningConfig;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts to have only the effective maxRowsPerSegment to avoid false positives when targetRowsPerSegment is set but
|
||||
* effectively translates to the same maxRowsPerSegment.
|
||||
*/
|
||||
static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRangePartitionsSpec partitionsSpec)
|
||||
{
|
||||
return new DimensionRangePartitionsSpec(
|
||||
null,
|
||||
partitionsSpec.getMaxRowsPerSegment(),
|
||||
partitionsSpec.getPartitionDimensions(),
|
||||
partitionsSpec.isAssumeGrouped()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates {@link #CHECKS} to determine the compaction status.
|
||||
*/
|
||||
|
@ -286,10 +302,14 @@ public class CompactionStatus
|
|||
|
||||
private CompactionStatus partitionsSpecIsUpToDate()
|
||||
{
|
||||
PartitionsSpec existingPartionsSpec = lastCompactionState.getPartitionsSpec();
|
||||
if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) {
|
||||
existingPartionsSpec = getEffectiveRangePartitionsSpec((DimensionRangePartitionsSpec) existingPartionsSpec);
|
||||
}
|
||||
return CompactionStatus.completeIfEqual(
|
||||
"partitionsSpec",
|
||||
findPartitionsSpecFromConfig(tuningConfig),
|
||||
lastCompactionState.getPartitionsSpec(),
|
||||
existingPartionsSpec,
|
||||
CompactionStatus::asString
|
||||
);
|
||||
}
|
||||
|
|
|
@ -162,7 +162,7 @@ public class CompactionStatusTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsRange()
|
||||
public void testFindPartitionsSpecWhenGivenIsRangeWithMaxRows()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec =
|
||||
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
|
||||
|
@ -174,6 +174,19 @@ public class CompactionStatusTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsRangeWithTargetRows()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec =
|
||||
new DimensionRangePartitionsSpec(10000, null, Collections.singletonList("dim"), false);
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
new DimensionRangePartitionsSpec(null, 15000, Collections.singletonList("dim"), false),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStatusWhenLastCompactionStateIsNull()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue