NIFI-6723: Enrich processor-related and JVM GC metrics in Prometheus Reporting Task

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3771
This commit is contained in:
Kotaro Terada 2019-09-27 09:53:39 +09:00 committed by Matthew Burgess
parent 02d3b7e92b
commit d7ca37d065
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24

View File

@ -20,6 +20,7 @@ package org.apache.nifi.reporting.prometheus.api;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
@ -47,7 +48,7 @@ public class PrometheusMetricsUtil {
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
// Process Group metrics
// Processor / Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
@ -66,6 +67,12 @@ public class PrometheusMetricsUtil {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build()
.name("nifi_amount_flowfiles_removed")
.help("Total number of FlowFiles removed by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
@ -150,6 +157,7 @@ public class PrometheusMetricsUtil {
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Processor metrics
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
.name("nifi_processor_counters")
.help("Counters exposed by NiFi Processors")
@ -252,6 +260,18 @@ public class PrometheusMetricsUtil {
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_RUNS = Gauge.build()
.name("nifi_jvm_gc_runs")
.help("NiFi JVM GC number of runs")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_TIME = Gauge.build()
.name("nifi_jvm_gc_time")
.help("NiFi JVM GC time in milliseconds")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
final String componentId = status.getId();
@ -305,10 +325,10 @@ public class PrometheusMetricsUtil {
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
// Report metrics for all components
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
Map<String, Long> counters = processorStatus.getCounters();
if(counters != null) {
if (counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
}
@ -317,13 +337,36 @@ public class PrometheusMetricsUtil {
final String procComponentId = processorStatus.getId();
final String procComponentName = processorStatus.getName();
final String parentId = processorStatus.getGroupId();
AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesReceived());
AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesRemoved());
AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getOutputBytes());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getInputBytes());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getInputCount());
AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getAverageLineageDuration());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = connectionStatus.getId();
final String connComponentName = connectionStatus.getName();
final String sourceId = connectionStatus.getSourceId();
@ -355,7 +398,7 @@ public class PrometheusMetricsUtil {
IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(isBackpressureEnabled ? 1 : 0);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
@ -379,7 +422,7 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
for (PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
@ -403,7 +446,7 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = remoteProcessGroupStatus.getId();
final String rpgComponentName = remoteProcessGroupStatus.getName();
final String parentId = remoteProcessGroupStatus.getGroupId();
@ -446,6 +489,12 @@ public class PrometheusMetricsUtil {
JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
jvmMetrics.garbageCollectors()
.forEach((name, stat) -> {
JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns());
JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS));
});
return JVM_REGISTRY;
}