Segments created in the same batch have the same `created_date` entry & rename metric (#15977)

* All segments stored in the same batch have the same created_date entry.

In the absence of a group_id column, this metadata would allow us to easily
reason about and troubleshoot ingestion-related issues.

* Rename metric name and code references to eligibleUnusedSegments.

Address review comment from https://github.com/apache/druid/pull/15941#discussion_r1503631992
This commit is contained in:
Abhishek Radhakrishnan 2024-02-27 17:28:43 +05:30 committed by GitHub
parent 5bb5b41b18
commit beccc401e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 35 additions and 34 deletions

View File

@ -342,7 +342,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies|
|`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies|
|`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies|
|`kill/candidateUnusedSegments/count`|The number of candidate unused segments eligible for deletion from the metadata store during an auto kill run for a datasource.|`dataSource`|Varies|
|`kill/eligibleUnusedSegments/count`|The number of unused segments of a datasource that are identified as eligible for deletion from the metadata store by the coordinator.|`dataSource`|Varies|
|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|

View File

@ -1372,6 +1372,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Map<SegmentIdWithShardSpec, SegmentCreateRequest> segmentIdToRequest = new HashMap<>();
createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
final String now = DateTimes.nowUtc().toString();
for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : segmentIdToRequest.entrySet()) {
final SegmentCreateRequest request = entry.getValue();
final SegmentIdWithShardSpec segmentId = entry.getKey();
@ -1380,7 +1381,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
insertBatch.add()
.bind("id", segmentId.toString())
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("created_date", now)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", request.getSequenceName())
@ -1977,10 +1978,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
final String now = DateTimes.nowUtc().toString();
PreparedBatch preparedBatch = handle.prepareBatch(buildSqlToInsertSegments());
for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
final String now = DateTimes.nowUtc().toString();
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
@ -2150,10 +2151,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
final String now = DateTimes.nowUtc().toString();
final PreparedBatch batch = handle.prepareBatch(buildSqlToInsertSegments());
for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
final String now = DateTimes.nowUtc().toString();
batch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())

View File

@ -275,10 +275,10 @@ public class KillUnusedSegments implements CoordinatorDuty
);
// Each unused segment interval returned above has a 1:1 correspondence with an unused segment. So we can assume
// these are candidate segments eligible for deletion by the kill task. After the umbrella interval is computed
// these are eligible segments for deletion by the kill task. After the umbrella interval is computed
// below, we cannot say the same as there can be multiple unused segments with different usedStatusLastUpdatedTime.
final RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
stats.add(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
return null;

View File

@ -149,8 +149,8 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count");
public static final CoordinatorStat SUBMITTED_TASKS
= CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
public static final CoordinatorStat CANDIDATE_UNUSED_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killCandidateUnusedSegs", "kill/candidateUnusedSegments/count");
public static final CoordinatorStat ELIGIBLE_UNUSED_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killEligibleUnusedSegs", "kill/eligibleUnusedSegments/count");
public static final CoordinatorStat PENDING_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count");
}

View File

@ -129,7 +129,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, YEAR_OLD);
}
@ -171,8 +171,8 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
@ -182,8 +182,8 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(20, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(20, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
validateLastKillStateAndReset(DS2, NEXT_DAY);
@ -193,8 +193,8 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(30, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(30, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
validateLastKillStateAndReset(DS1, NEXT_MONTH);
validateLastKillStateAndReset(DS2, null);
@ -226,7 +226,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_MONTH.getEnd()));
}
@ -252,7 +252,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
@ -266,7 +266,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(20, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(20, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, NEXT_MONTH);
@ -275,7 +275,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(30, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(30, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, null);
@ -284,7 +284,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(40, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(40, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
}
@ -305,9 +305,9 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(0, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS3_STAT_KEY));
Assert.assertEquals(0, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
validateLastKillStateAndReset(DS1, null);
validateLastKillStateAndReset(DS2, YEAR_OLD);
@ -332,7 +332,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_DAY.getEnd())
);
@ -356,7 +356,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(6, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(6, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
// All past and future unused segments should be killed
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_MONTH.getEnd()));
@ -377,7 +377,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, YEAR_OLD);
}
@ -403,7 +403,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
@ -412,7 +412,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, null);
}
@ -444,8 +444,8 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
validateLastKillStateAndReset(DS2, YEAR_OLD);
@ -579,7 +579,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, firstHalfEternity);
}
@ -608,7 +608,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
}
@ -639,7 +639,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, secondHalfEternity);
}
@ -659,7 +659,7 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, YEAR_OLD);
}