diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
index b2f8b912dc..369a8257f1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/pom.xml
@@ -43,6 +43,11 @@
nifi-utils
1.12.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-properties
+ 1.12.0-SNAPSHOT
+
org.apache.nifi
nifi-metrics
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index 01b91a8c55..967b75d0cf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -42,6 +42,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
public class PrometheusMetricsUtil {
@@ -59,6 +60,8 @@ public class PrometheusMetricsUtil {
public static final Collection ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
+ protected static final String DEFAULT_LABEL_STRING = "";
+
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
@@ -368,10 +371,13 @@ public class PrometheusMetricsUtil {
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
- public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
+ public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
- final String componentId = status.getId();
- final String componentName = status.getName();
+ final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
+ final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
+ final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
+ final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
+ final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();
// Clear all collectors to deal with removed/renamed components -- for root PG only
if("RootProcessGroup".equals(componentType)) {
@@ -434,9 +440,9 @@ public class PrometheusMetricsUtil {
}
final String procComponentType = "Processor";
- final String procComponentId = processorStatus.getId();
- final String procComponentName = processorStatus.getName();
- final String parentId = processorStatus.getGroupId();
+ final String procComponentId = StringUtils.isEmpty(processorStatus.getId()) ? DEFAULT_LABEL_STRING : processorStatus.getId();
+ final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName();
+ final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId();
AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesReceived());
@@ -469,13 +475,13 @@ public class PrometheusMetricsUtil {
}
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
- final String connComponentId = connectionStatus.getId();
- final String connComponentName = connectionStatus.getName();
- final String sourceId = connectionStatus.getSourceId();
- final String sourceName = connectionStatus.getSourceName();
- final String destinationId = connectionStatus.getDestinationId();
- final String destinationName = connectionStatus.getDestinationName();
- final String parentId = connectionStatus.getGroupId();
+ final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId();
+ final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName();
+ final String sourceId = StringUtils.isEmpty(connectionStatus.getSourceId()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceId();
+ final String sourceName = StringUtils.isEmpty(connectionStatus.getSourceName()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceName();
+ final String destinationId = StringUtils.isEmpty(connectionStatus.getDestinationId()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationId();
+ final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName();
+ final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId();
final String connComponentType = "Connection";
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getOutputBytes());
@@ -501,9 +507,9 @@ public class PrometheusMetricsUtil {
.set(isBackpressureEnabled ? 1 : 0);
}
for (PortStatus portStatus : status.getInputPortStatus()) {
- final String portComponentId = portStatus.getId();
- final String portComponentName = portStatus.getName();
- final String parentId = portStatus.getGroupId();
+ final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
+ final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
+ final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentType = "InputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
@@ -527,9 +533,9 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for (PortStatus portStatus : status.getOutputPortStatus()) {
- final String portComponentId = portStatus.getId();
- final String portComponentName = portStatus.getName();
- final String parentId = portStatus.getGroupId();
+ final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
+ final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName();
+ final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId();
final String portComponentType = "OutputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
@@ -553,9 +559,9 @@ public class PrometheusMetricsUtil {
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
- final String rpgComponentId = remoteProcessGroupStatus.getId();
- final String rpgComponentName = remoteProcessGroupStatus.getName();
- final String parentId = remoteProcessGroupStatus.getGroupId();
+ final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId();
+ final String rpgComponentName = StringUtils.isEmpty(remoteProcessGroupStatus.getName()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getName();
+ final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId();
final String rpgComponentType = "RemoteProcessGroup";
AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
@@ -584,7 +590,8 @@ public class PrometheusMetricsUtil {
return NIFI_REGISTRY;
}
- public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) {
+ public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instId) {
+ final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B));
JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage());
JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage());
@@ -604,9 +611,19 @@ public class PrometheusMetricsUtil {
return JVM_REGISTRY;
}
- public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName,
- String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) {
+ public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instId, String connComponentType, String connName,
+ String connId, String pgId, String srcId, String srcName, String destId, String destName) {
if(statusAnalytics != null) {
+ final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
+ final String connComponentId = StringUtils.isEmpty(connId) ? DEFAULT_LABEL_STRING : connId;
+ final String connComponentName = StringUtils.isEmpty(connName) ? DEFAULT_LABEL_STRING : connName;
+ final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId;
+ final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName;
+ final String destinationId = StringUtils.isEmpty(destId) ? DEFAULT_LABEL_STRING : destId;
+ final String destinationName = StringUtils.isEmpty(destName) ? DEFAULT_LABEL_STRING : destName;
+ final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId;
+
+
Map predictions = statusAnalytics.getPredictions();
TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(predictions.get("timeToBytesBackpressureMillis"));
@@ -621,9 +638,17 @@ public class PrometheusMetricsUtil {
return CONNECTION_ANALYTICS_REGISTRY;
}
- public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress,
- String category, String sourceName, String sourceId, String level) {
-
+ public static CollectorRegistry createBulletinMetrics(String instId, String compType, String compId, String pgId, String nodeAddr,
+ String cat, String srcName, String srcId, String lvl) {
+ final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
+ final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
+ final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
+ final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId;
+ final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName;
+ final String nodeAddress = StringUtils.isEmpty(nodeAddr) ? DEFAULT_LABEL_STRING : nodeAddr;
+ final String category = StringUtils.isEmpty(cat) ? DEFAULT_LABEL_STRING : cat;
+ final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId;
+ final String level = StringUtils.isEmpty(lvl) ? DEFAULT_LABEL_STRING : lvl;
BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1);
return BULLETIN_REGISTRY;
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index 381c68fbbc..77e3f7ddc4 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -145,6 +145,11 @@ public class TestPrometheusReportingTask {
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
Assert.assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
+ try {
+ testedReportingTask.OnStopped();
+ } catch (Exception e) {
+ // Ignore
+ }
}
private String getMetrics() throws IOException {
@@ -160,4 +165,22 @@ public class TestPrometheusReportingTask {
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
+
+ @Test
+ public void testNullLabel() throws IOException, InitializationException {
+ rootGroupStatus.setName(null);
+ testedReportingTask.initialize(reportingInitContextStub);
+ testedReportingTask.onScheduled(configurationContextStub);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ String content = getMetrics();
+ Assert.assertTrue(content.contains("parent_id=\"\""));
+
+ try {
+ testedReportingTask.OnStopped();
+ } catch(Exception e) {
+ // Ignore
+ }
+ }
}