Enhance streaming ingestion metrics (#13331)

Changes:
- Add a metric for partition-wise kafka/kinesis lag for streaming ingestion.
- Emit lag metrics for streaming ingestion when supervisor is not suspended and state is in {RUNNING, IDLE, UNHEALTHY_TASKS, UNHEALTHY_SUPERVISOR}
- Document metrics
This commit is contained in:
AmatyaAvadhanula 2022-11-09 23:44:15 +05:30 committed by GitHub
parent b7a513fe09
commit a2013e6566
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 59 additions and 21 deletions

View File

@ -183,9 +183,10 @@ These metrics apply to the [Kafka indexing service](../development/extensions-co
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, should not be a very high number |
|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, should not be a very high number |
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, should not be a very high number |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, should not be a very high number |
|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|dataSource, stream, partition.|Greater than 0, should not be a very high number |
### Ingestion metrics for Kinesis
@ -193,9 +194,10 @@ These metrics apply to the [Kinesis indexing service](../development/extensions-
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|dataSource, stream.|Greater than 0, up to max Kinesis retention period in milliseconds |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|dataSource, stream, partition.|Greater than 0, up to max Kinesis retention period in milliseconds |
### Other ingestion metrics
@ -223,6 +225,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator|dataSource.|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor|dataSource| < 1s. |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|dataSource, taskId| < 10 seconds.|
|`ingest/handoff/time`|Total time taken for each set of segments handed off.|dataSource, taskId, taskType.|Depends on coordinator cycle time|
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

View File

@ -102,13 +102,21 @@
"dataSource"
],
"ingest/kafka/lag": [
"dataSource"
"dataSource",
"stream"
],
"ingest/kafka/maxLag": [
"dataSource"
"dataSource",
"stream"
],
"ingest/kafka/avgLag": [
"dataSource"
"dataSource",
"stream"
],
"ingest/kafka/partitionLag": [
"dataSource",
"stream",
"partition"
],
"task/run/time": [
"dataSource"

View File

@ -53,9 +53,10 @@
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000.0, "help": "Seconds spent merging intermediate segments" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer", "conversionFactor": 1000000000.0, "help": "Cpu time in Seconds spent on merging intermediate segments."},
"ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge", "help": "Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge", "help": "Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge", "help": "Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge", "help": "Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute."},
"ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream", "partition"], "type" : "gauge", "help": "Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute."},
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."},
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included."},

View File

@ -53,9 +53,10 @@
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
"ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
"ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream", "partition"], "type" : "gauge" },
"task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },

View File

@ -86,6 +86,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@ -4092,9 +4093,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
if (spec.isSuspended() || !(stateManager.isSteadyState() || stateManager.isIdle())) {
// don't emit metrics if supervisor is suspended or not in a healthy running state
// (lag should still available in status report)
SupervisorStateManager.State basicState = stateManager.getSupervisorState().getBasicState();
boolean unhealthySupervisorOrTasks = SupervisorStateManager.BasicState.UNHEALTHY_TASKS.equals(basicState)
|| SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR.equals(basicState);
if (spec.isSuspended() || !(stateManager.isSteadyState() || stateManager.isIdle() || unhealthySupervisorOrTasks)) {
// Don't emit metrics if the supervisor is suspended. Also,
// to emit metrics, the state must be in {healthy steady state, idle or UNHEALTHY_TASKS or UNHEALTHY_SUPERVISOR}
// (lag should still be available in the status report)
return;
}
try {
@ -4112,19 +4118,34 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
LagStats lagStats = computeLags(partitionLags);
for (Map.Entry<PartitionIdType, Long> entry : partitionLags.entrySet()) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimension(DruidMetrics.PARTITION, entry.getKey())
.build(
StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
entry.getValue()
)
);
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
);
};

View File

@ -48,6 +48,10 @@ public class DruidMetrics
public static final String DUTY = "duty";
public static final String DUTY_GROUP = "dutyGroup";
public static final String STREAM = "stream";
public static final String PARTITION = "partition";
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
int retVal = 0;