Adding extra debug logs for the checkpoint logic (#16321)

Logging to understand checkpointing better in streaming ingestion
This commit is contained in:
PANKAJ KUMAR 2024-09-24 09:38:46 +05:30 committed by GitHub
parent 8eaac2c051
commit 36dfff4b1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 6 additions and 0 deletions

View File

@ -627,6 +627,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
stillReading = !assignment.isEmpty();
SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToCheckpoint = null;
AppenderatorDriverAddResult pushTriggeringAddResult = null;
for (OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType> record : records) {
final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber());
@ -677,6 +678,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired && !sequenceToUse.isCheckpointed()) {
pushTriggeringAddResult = addResult;
sequenceToCheckpoint = sequenceToUse;
}
isPersistRequired |= addResult.isPersistRequired();
@ -739,6 +741,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (System.currentTimeMillis() > nextCheckpointTime) {
sequenceToCheckpoint = getLastSequenceMetadata();
log.info("Next checkpoint time, updating sequenceToCheckpoint, SequenceToCheckpoint: [%s]", sequenceToCheckpoint);
}
if (pushTriggeringAddResult != null) {
log.info("Hit the row limit updating sequenceToCheckpoint, SequenceToCheckpoint: [%s], rowInSegment: [%s], TotalRows: [%s]", sequenceToCheckpoint, pushTriggeringAddResult.getNumRowsInSegment(), pushTriggeringAddResult.getTotalNumRowsInAppenderator());
}
if (sequenceToCheckpoint != null && stillReading) {