From cd10435b9fa5ab180c80ef5c04069971b32b0fa9 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 22 Apr 2020 10:37:04 -0400 Subject: [PATCH] NIFI-7378: Ensure label values are not null in Prometheus metrics (#4219) --- .../nifi-prometheus-utils/pom.xml | 5 ++ .../util/PrometheusMetricsUtil.java | 81 ++++++++++++------- .../TestPrometheusReportingTask.java | 23 ++++++ 3 files changed, 81 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml index b2f8b912dc..369a8257f1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml @@ -43,6 +43,11 @@ nifi-utils 1.12.0-SNAPSHOT + + org.apache.nifi + nifi-properties + 1.12.0-SNAPSHOT + org.apache.nifi nifi-metrics diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index 01b91a8c55..967b75d0cf 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -42,6 +42,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.metrics.jvm.JvmMetrics; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; public class PrometheusMetricsUtil { @@ -59,6 +60,8 @@ public class PrometheusMetricsUtil { public static final Collection ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY); + protected static final String DEFAULT_LABEL_STRING = ""; + // Common properties/values public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", "ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously"); @@ -368,10 +371,13 @@ public class PrometheusMetricsUtil { .labelNames("instance", "gc_name") .register(JVM_REGISTRY); - public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) { + public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instId, String parentProcessGroupId, String compType, String metricsStrategy) { - final String componentId = status.getId(); - final String componentName = status.getName(); + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId; + final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType; + final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId(); + final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName(); // Clear all collectors to deal with removed/renamed components -- for root PG only if("RootProcessGroup".equals(componentType)) { @@ -434,9 +440,9 @@ public class PrometheusMetricsUtil { } final String procComponentType = "Processor"; - final String procComponentId = processorStatus.getId(); - final String procComponentName = processorStatus.getName(); - final String parentId = processorStatus.getGroupId(); + final String procComponentId = StringUtils.isEmpty(processorStatus.getId()) ? DEFAULT_LABEL_STRING : processorStatus.getId(); + final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName(); + final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId(); AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesSent()); AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesReceived()); @@ -469,13 +475,13 @@ public class PrometheusMetricsUtil { } for (ConnectionStatus connectionStatus : status.getConnectionStatus()) { - final String connComponentId = connectionStatus.getId(); - final String connComponentName = connectionStatus.getName(); - final String sourceId = connectionStatus.getSourceId(); - final String sourceName = connectionStatus.getSourceName(); - final String destinationId = connectionStatus.getDestinationId(); - final String destinationName = connectionStatus.getDestinationName(); - final String parentId = connectionStatus.getGroupId(); + final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId(); + final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName(); + final String sourceId = StringUtils.isEmpty(connectionStatus.getSourceId()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceId(); + final String sourceName = StringUtils.isEmpty(connectionStatus.getSourceName()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceName(); + final String destinationId = StringUtils.isEmpty(connectionStatus.getDestinationId()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationId(); + final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName(); + final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId(); final String connComponentType = "Connection"; SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) .set(connectionStatus.getOutputBytes()); @@ -501,9 +507,9 @@ public class PrometheusMetricsUtil { .set(isBackpressureEnabled ? 1 : 0); } for (PortStatus portStatus : status.getInputPortStatus()) { - final String portComponentId = portStatus.getId(); - final String portComponentName = portStatus.getName(); - final String parentId = portStatus.getGroupId(); + final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); + final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId(); + final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String portComponentType = "InputPort"; AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); @@ -527,9 +533,9 @@ public class PrometheusMetricsUtil { AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); } for (PortStatus portStatus : status.getOutputPortStatus()) { - final String portComponentId = portStatus.getId(); - final String portComponentName = portStatus.getName(); - final String parentId = portStatus.getGroupId(); + final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); + final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName(); + final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId(); final String portComponentType = "OutputPort"; AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); @@ -553,9 +559,9 @@ public class PrometheusMetricsUtil { AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); } for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { - final String rpgComponentId = remoteProcessGroupStatus.getId(); - final String rpgComponentName = remoteProcessGroupStatus.getName(); - final String parentId = remoteProcessGroupStatus.getGroupId(); + final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId(); + final String rpgComponentName = StringUtils.isEmpty(remoteProcessGroupStatus.getName()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getName(); + final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId(); final String rpgComponentType = "RemoteProcessGroup"; AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize()); @@ -584,7 +590,8 @@ public class PrometheusMetricsUtil { return NIFI_REGISTRY; } - public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) { + public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instId) { + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B)); JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage()); JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage()); @@ -604,9 +611,19 @@ public class PrometheusMetricsUtil { return JVM_REGISTRY; } - public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName, - String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) { + public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instId, String connComponentType, String connName, + String connId, String pgId, String srcId, String srcName, String destId, String destName) { if(statusAnalytics != null) { + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + final String connComponentId = StringUtils.isEmpty(connId) ? DEFAULT_LABEL_STRING : connId; + final String connComponentName = StringUtils.isEmpty(connName) ? DEFAULT_LABEL_STRING : connName; + final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId; + final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName; + final String destinationId = StringUtils.isEmpty(destId) ? DEFAULT_LABEL_STRING : destId; + final String destinationName = StringUtils.isEmpty(destName) ? DEFAULT_LABEL_STRING : destName; + final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId; + + Map predictions = statusAnalytics.getPredictions(); TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) .set(predictions.get("timeToBytesBackpressureMillis")); @@ -621,9 +638,17 @@ public class PrometheusMetricsUtil { return CONNECTION_ANALYTICS_REGISTRY; } - public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress, - String category, String sourceName, String sourceId, String level) { - + public static CollectorRegistry createBulletinMetrics(String instId, String compType, String compId, String pgId, String nodeAddr, + String cat, String srcName, String srcId, String lvl) { + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType; + final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId; + final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId; + final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName; + final String nodeAddress = StringUtils.isEmpty(nodeAddr) ? DEFAULT_LABEL_STRING : nodeAddr; + final String category = StringUtils.isEmpty(cat) ? DEFAULT_LABEL_STRING : cat; + final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId; + final String level = StringUtils.isEmpty(lvl) ? DEFAULT_LABEL_STRING : lvl; BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1); return BULLETIN_REGISTRY; } diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java index 381c68fbbc..77e3f7ddc4 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java @@ -145,6 +145,11 @@ public class TestPrometheusReportingTask { "nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0")); Assert.assertTrue(content.contains( "nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0")); + try { + testedReportingTask.OnStopped(); + } catch (Exception e) { + // Ignore + } } private String getMetrics() throws IOException { @@ -160,4 +165,22 @@ public class TestPrometheusReportingTask { HttpEntity entity = response.getEntity(); return EntityUtils.toString(entity); } + + @Test + public void testNullLabel() throws IOException, InitializationException { + rootGroupStatus.setName(null); + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.onScheduled(configurationContextStub); + reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus); + testedReportingTask.onTrigger(reportingContextStub); + + String content = getMetrics(); + Assert.assertTrue(content.contains("parent_id=\"\"")); + + try { + testedReportingTask.OnStopped(); + } catch(Exception e) { + // Ignore + } + } }