diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index e3270230aaa..d99d3e51f97 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -149,6 +149,8 @@ emission period.| |`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3| |`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event | |`ingest/kafka/lag`|Applicable for Kafka Indexing Service. Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number | +|`ingest/kafka/maxLag`|Applicable for Kafka Indexing Service. Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number | +|`ingest/kafka/avgLag`|Applicable for Kafka Indexing Service. Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number | Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json index 553e3a2227b..e28639e1690 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -99,6 +99,12 @@ "ingest/kafka/lag": [ "dataSource" ], + "ingest/kafka/maxLag": [ + "dataSource" + ], + "ingest/kafka/avgLag": [ + "dataSource" + ], "task/run/time": [ "dataSource" ], diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 73fd54ce15e..6992b8dcb28 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -2349,14 +2349,24 @@ public class KafkaSupervisor implements Supervisor ); } - long lag = getLagPerPartition(highestCurrentOffsets) - .values() - .stream() - .mapToLong(x -> Math.max(x, 0)) - .sum(); + Map partitionLags = getLagPerPartition(highestCurrentOffsets); + long maxLag = 0, totalLag = 0, avgLag; + for (long lag : partitionLags.values()) { + if (lag > maxLag) { + maxLag = lag; + } + totalLag += lag; + } + avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); emitter.emit( - ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", lag) + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag) + ); + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag) + ); + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag) ); } catch (Exception e) {