Improve task pause logging and metrics for streaming ingestion (#13313)

* Improve task pause logging and metrics for streaming ingestion

* Add metrics doc

* Fix spelling
This commit is contained in:
AmatyaAvadhanula 2022-11-07 21:33:54 +05:30 committed by GitHub
parent b1eaf7a21f
commit a738ac9ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 4 deletions

View File

@ -221,7 +221,8 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator|dataSource.|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor|dataSource, noticeType.| < 1s. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor|dataSource| < 1s. |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|dataSource, taskId| < 10 seconds.|
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

View File

@ -1311,14 +1311,16 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.PAUSED;
hasPaused.signalAll();
log.debug("Received pause command, pausing ingestion until resumed.");
long pauseTime = System.currentTimeMillis();
log.info("Received pause command, pausing ingestion until resumed.");
while (pauseRequested) {
shouldResume.await();
}
status = Status.READING;
shouldResume.signalAll();
log.debug("Received resume command, resuming ingestion.");
log.info("Received resume command, resuming ingestion.");
task.emitMetric(toolbox.getEmitter(), "ingest/pause/time", System.currentTimeMillis() - pauseTime);
return true;
}
}

View File

@ -3064,7 +3064,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
log.info(
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
"Setting endOffsets for tasks in taskGroup [%d] to %s",
taskGroup.groupId,
endOffsets
);
@ -3083,6 +3083,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
taskGroup.tasks.remove(taskId);
} else {
log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i));
}
}
}