emit maxLag/avgLag in KafkaSupervisor (#6587)

* emit maxLag/totalLag/avgLag in KafkaSupervisor

* modify ingest/kafka/totalLag to ingest/kafka/lag for backwards compatibility
This commit is contained in:
Mingming Qiu 2018-11-28 18:11:14 +08:00 committed by Gian Merlino
parent f4b49f01ff
commit c5405bb592
3 changed files with 24 additions and 6 deletions

View File

@ -149,6 +149,8 @@ emission period.|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
|`ingest/kafka/lag`|Applicable for Kafka Indexing Service. Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
|`ingest/kafka/maxLag`|Applicable for Kafka Indexing Service. Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
|`ingest/kafka/avgLag`|Applicable for Kafka Indexing Service. Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

View File

@ -99,6 +99,12 @@
"ingest/kafka/lag": [
"dataSource"
],
"ingest/kafka/maxLag": [
"dataSource"
],
"ingest/kafka/avgLag": [
"dataSource"
],
"task/run/time": [
"dataSource"
],

View File

@ -2349,14 +2349,24 @@ public class KafkaSupervisor implements Supervisor
);
}
long lag = getLagPerPartition(highestCurrentOffsets)
.values()
.stream()
.mapToLong(x -> Math.max(x, 0))
.sum();
Map<Integer, Long> partitionLags = getLagPerPartition(highestCurrentOffsets);
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag)
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag)
);
}
catch (Exception e) {