mirror of https://github.com/apache/druid.git
compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11510)
* fix compaction status api * fix checkstyle * address comment
This commit is contained in:
parent
995d99d9e4
commit
05a7da792f
|
@ -48,7 +48,6 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -108,9 +107,7 @@ public class CompactSegments implements CoordinatorDuty
|
|||
|
||||
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
|
||||
final CoordinatorStats stats = new CoordinatorStats();
|
||||
final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
|
||||
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
|
||||
updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
|
||||
if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
|
||||
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
|
||||
params.getUsedSegmentsTimelinesPerDataSource();
|
||||
|
@ -197,7 +194,7 @@ public class CompactSegments implements CoordinatorDuty
|
|||
);
|
||||
stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
|
||||
stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
|
||||
|
||||
final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
|
||||
if (numAvailableCompactionTaskSlots > 0) {
|
||||
stats.accumulate(
|
||||
doRun(
|
||||
|
@ -303,23 +300,6 @@ public class CompactSegments implements CoordinatorDuty
|
|||
return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
|
||||
}
|
||||
|
||||
private void updateAutoCompactionSnapshot(
|
||||
List<DataSourceCompactionConfig> compactionConfigList,
|
||||
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
|
||||
{
|
||||
|
||||
Set<String> enabledDatasources = compactionConfigList.stream()
|
||||
.map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
|
||||
.collect(Collectors.toSet());
|
||||
// Create and Update snapshot for dataSource that has auto compaction enabled
|
||||
for (String compactionConfigDataSource : enabledDatasources) {
|
||||
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
compactionConfigDataSource,
|
||||
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
|
||||
{
|
||||
return taskStatuses
|
||||
|
@ -351,7 +331,11 @@ public class CompactSegments implements CoordinatorDuty
|
|||
if (!segmentsToCompact.isEmpty()) {
|
||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
||||
// As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
|
||||
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSourceName,
|
||||
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
|
||||
);
|
||||
snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
|
||||
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
|
||||
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
|
||||
|
@ -426,7 +410,10 @@ public class CompactSegments implements CoordinatorDuty
|
|||
final List<DataSegment> segmentsToCompact = iterator.next();
|
||||
if (!segmentsToCompact.isEmpty()) {
|
||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSourceName,
|
||||
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
|
||||
);
|
||||
snapshotBuilder.incrementBytesAwaitingCompaction(
|
||||
segmentsToCompact.stream()
|
||||
.mapToLong(DataSegment::getSize)
|
||||
|
@ -444,26 +431,35 @@ public class CompactSegments implements CoordinatorDuty
|
|||
|
||||
// Statistics of all segments considered compacted after this run
|
||||
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
|
||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
|
||||
final String dataSource = compactionStatisticsEntry.getKey();
|
||||
final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
|
||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSource,
|
||||
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
|
||||
);
|
||||
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
|
||||
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
|
||||
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
|
||||
}
|
||||
|
||||
// Statistics of all segments considered skipped after this run
|
||||
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
|
||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
|
||||
final String dataSource = compactionStatisticsEntry.getKey();
|
||||
final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
|
||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSource,
|
||||
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
|
||||
);
|
||||
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
|
||||
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
|
||||
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
|
||||
}
|
||||
|
||||
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
|
||||
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
|
||||
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
|
||||
CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
|
||||
CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
|
||||
|
||||
if (dataSourceCompactedStatistics != null) {
|
||||
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
|
||||
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
|
||||
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
|
||||
}
|
||||
|
||||
if (dataSourceSkippedStatistics != null) {
|
||||
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
|
||||
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
|
||||
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
|
||||
}
|
||||
|
||||
// Build the complete snapshot for the datasource
|
||||
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
|
||||
|
|
|
@ -315,7 +315,7 @@ public class CompactSegmentsTest
|
|||
Assert.assertEquals(0, autoCompactionSnapshots.size());
|
||||
|
||||
for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
|
||||
assertCompactSegmentStatistics(compactSegments, compactionRunCount);
|
||||
doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
|
||||
}
|
||||
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
|
||||
final CoordinatorStats stats = doCompactSegments(compactSegments);
|
||||
|
@ -465,6 +465,73 @@ public class CompactSegmentsTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMakeStatsWithDeactivatedDatasource()
|
||||
{
|
||||
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
|
||||
leaderClient.start();
|
||||
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
|
||||
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
|
||||
|
||||
// Before any compaction, we do not have any snapshot of compactions
|
||||
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
|
||||
Assert.assertEquals(0, autoCompactionSnapshots.size());
|
||||
|
||||
for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
|
||||
doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
|
||||
}
|
||||
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
|
||||
final CoordinatorStats stats = doCompactSegments(compactSegments);
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
|
||||
);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
verifySnapshot(
|
||||
compactSegments,
|
||||
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
|
||||
DATA_SOURCE_PREFIX + i,
|
||||
0,
|
||||
TOTAL_BYTE_PER_DATASOURCE,
|
||||
0,
|
||||
0,
|
||||
TOTAL_INTERVAL_PER_DATASOURCE,
|
||||
0,
|
||||
0,
|
||||
TOTAL_SEGMENT_PER_DATASOURCE / 2,
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
// Deactivate one datasource (datasource 0 no longer exist in timeline)
|
||||
dataSources.remove(DATA_SOURCE_PREFIX + 0);
|
||||
|
||||
// Test run auto compaction with one datasource deactivated
|
||||
// Snapshot should not contain deactivated datasource
|
||||
doCompactSegments(compactSegments);
|
||||
for (int i = 1; i < 3; i++) {
|
||||
verifySnapshot(
|
||||
compactSegments,
|
||||
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
|
||||
DATA_SOURCE_PREFIX + i,
|
||||
0,
|
||||
TOTAL_BYTE_PER_DATASOURCE,
|
||||
0,
|
||||
0,
|
||||
TOTAL_INTERVAL_PER_DATASOURCE,
|
||||
0,
|
||||
0,
|
||||
TOTAL_SEGMENT_PER_DATASOURCE / 2,
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, compactSegments.getAutoCompactionSnapshot().size());
|
||||
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 1));
|
||||
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 2));
|
||||
Assert.assertFalse(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMakeStatsForDataSourceWithSkipped()
|
||||
{
|
||||
|
@ -1025,7 +1092,7 @@ public class CompactSegmentsTest
|
|||
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
|
||||
}
|
||||
|
||||
private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
|
||||
private void doCompactionAndAssertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
|
||||
{
|
||||
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
|
||||
// One compaction task triggered
|
||||
|
|
Loading…
Reference in New Issue