mirror of https://github.com/apache/nifi.git
NIFI-7378: Ensure label values are not null in Prometheus metrics (#4219)
This commit is contained in:
parent
e2716a6c94
commit
cd10435b9f
|
@ -43,6 +43,11 @@
|
|||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-properties</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics</artifactId>
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
|
|||
import org.apache.nifi.metrics.jvm.JvmMetrics;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
public class PrometheusMetricsUtil {
|
||||
|
||||
|
@ -59,6 +60,8 @@ public class PrometheusMetricsUtil {
|
|||
|
||||
public static final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
|
||||
|
||||
protected static final String DEFAULT_LABEL_STRING = "";
|
||||
|
||||
// Common properties/values
|
||||
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
|
||||
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
|
||||
|
@ -368,10 +371,13 @@ public class PrometheusMetricsUtil {
|
|||
.labelNames("instance", "gc_name")
|
||||
.register(JVM_REGISTRY);
|
||||
|
||||
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
|
||||
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
|
||||
|
||||
final String componentId = status.getId();
|
||||
final String componentName = status.getName();
|
||||
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
|
||||
final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
|
||||
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
|
||||
final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
|
||||
final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();
|
||||
|
||||
// Clear all collectors to deal with removed/renamed components -- for root PG only
|
||||
if("RootProcessGroup".equals(componentType)) {
|
||||
|
@ -434,9 +440,9 @@ public class PrometheusMetricsUtil {
|
|||
}
|
||||
|
||||
final String procComponentType = "Processor";
|
||||
final String procComponentId = processorStatus.getId();
|
||||
final String procComponentName = processorStatus.getName();
|
||||
final String parentId = processorStatus.getGroupId();
|
||||
final String procComponentId = StringUtils.isEmpty(processorStatus.getId()) ? DEFAULT_LABEL_STRING : processorStatus.getId();
|
||||
final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName();
|
||||
final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId();
|
||||
|
||||
AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesSent());
|
||||
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesReceived());
|
||||
|
@ -469,13 +475,13 @@ public class PrometheusMetricsUtil {
|
|||
|
||||
}
|
||||
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
|
||||
final String connComponentId = connectionStatus.getId();
|
||||
final String connComponentName = connectionStatus.getName();
|
||||
final String sourceId = connectionStatus.getSourceId();
|
||||
final String sourceName = connectionStatus.getSourceName();
|
||||
final String destinationId = connectionStatus.getDestinationId();
|
||||
final String destinationName = connectionStatus.getDestinationName();
|
||||
final String parentId = connectionStatus.getGroupId();
|
||||
final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId();
|
||||
final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName();
|
||||
final String sourceId = StringUtils.isEmpty(connectionStatus.getSourceId()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceId();
|
||||
final String sourceName = StringUtils.isEmpty(connectionStatus.getSourceName()) ? DEFAULT_LABEL_STRING : connectionStatus.getSourceName();
|
||||
final String destinationId = StringUtils.isEmpty(connectionStatus.getDestinationId()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationId();
|
||||
final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName();
|
||||
final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId();
|
||||
final String connComponentType = "Connection";
|
||||
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(connectionStatus.getOutputBytes());
|
||||
|
@ -501,9 +507,9 @@ public class PrometheusMetricsUtil {
|
|||
.set(isBackpressureEnabled ? 1 : 0);
|
||||
}
|
||||
for (PortStatus portStatus : status.getInputPortStatus()) {
|
||||
final String portComponentId = portStatus.getId();
|
||||
final String portComponentName = portStatus.getName();
|
||||
final String parentId = portStatus.getGroupId();
|
||||
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
|
||||
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
|
||||
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
|
||||
final String portComponentType = "InputPort";
|
||||
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
|
||||
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
|
||||
|
@ -527,9 +533,9 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
|
||||
}
|
||||
for (PortStatus portStatus : status.getOutputPortStatus()) {
|
||||
final String portComponentId = portStatus.getId();
|
||||
final String portComponentName = portStatus.getName();
|
||||
final String parentId = portStatus.getGroupId();
|
||||
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
|
||||
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName();
|
||||
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId();
|
||||
final String portComponentType = "OutputPort";
|
||||
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
|
||||
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
|
||||
|
@ -553,9 +559,9 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
|
||||
}
|
||||
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
|
||||
final String rpgComponentId = remoteProcessGroupStatus.getId();
|
||||
final String rpgComponentName = remoteProcessGroupStatus.getName();
|
||||
final String parentId = remoteProcessGroupStatus.getGroupId();
|
||||
final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId();
|
||||
final String rpgComponentName = StringUtils.isEmpty(remoteProcessGroupStatus.getName()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getName();
|
||||
final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId();
|
||||
final String rpgComponentType = "RemoteProcessGroup";
|
||||
|
||||
AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
|
||||
|
@ -584,7 +590,8 @@ public class PrometheusMetricsUtil {
|
|||
return NIFI_REGISTRY;
|
||||
}
|
||||
|
||||
public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) {
|
||||
public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instId) {
|
||||
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
|
||||
JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B));
|
||||
JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage());
|
||||
JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage());
|
||||
|
@ -604,9 +611,19 @@ public class PrometheusMetricsUtil {
|
|||
return JVM_REGISTRY;
|
||||
}
|
||||
|
||||
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName,
|
||||
String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) {
|
||||
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instId, String connComponentType, String connName,
|
||||
String connId, String pgId, String srcId, String srcName, String destId, String destName) {
|
||||
if(statusAnalytics != null) {
|
||||
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
|
||||
final String connComponentId = StringUtils.isEmpty(connId) ? DEFAULT_LABEL_STRING : connId;
|
||||
final String connComponentName = StringUtils.isEmpty(connName) ? DEFAULT_LABEL_STRING : connName;
|
||||
final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId;
|
||||
final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName;
|
||||
final String destinationId = StringUtils.isEmpty(destId) ? DEFAULT_LABEL_STRING : destId;
|
||||
final String destinationName = StringUtils.isEmpty(destName) ? DEFAULT_LABEL_STRING : destName;
|
||||
final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId;
|
||||
|
||||
|
||||
Map<String, Long> predictions = statusAnalytics.getPredictions();
|
||||
TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(predictions.get("timeToBytesBackpressureMillis"));
|
||||
|
@ -621,9 +638,17 @@ public class PrometheusMetricsUtil {
|
|||
return CONNECTION_ANALYTICS_REGISTRY;
|
||||
}
|
||||
|
||||
public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress,
|
||||
String category, String sourceName, String sourceId, String level) {
|
||||
|
||||
public static CollectorRegistry createBulletinMetrics(String instId, String compType, String compId, String pgId, String nodeAddr,
|
||||
String cat, String srcName, String srcId, String lvl) {
|
||||
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
|
||||
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
|
||||
final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
|
||||
final String sourceId = StringUtils.isEmpty(srcId) ? DEFAULT_LABEL_STRING : srcId;
|
||||
final String sourceName = StringUtils.isEmpty(srcName) ? DEFAULT_LABEL_STRING : srcName;
|
||||
final String nodeAddress = StringUtils.isEmpty(nodeAddr) ? DEFAULT_LABEL_STRING : nodeAddr;
|
||||
final String category = StringUtils.isEmpty(cat) ? DEFAULT_LABEL_STRING : cat;
|
||||
final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId;
|
||||
final String level = StringUtils.isEmpty(lvl) ? DEFAULT_LABEL_STRING : lvl;
|
||||
BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1);
|
||||
return BULLETIN_REGISTRY;
|
||||
}
|
||||
|
|
|
@ -145,6 +145,11 @@ public class TestPrometheusReportingTask {
|
|||
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertTrue(content.contains(
|
||||
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
try {
|
||||
testedReportingTask.OnStopped();
|
||||
} catch (Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
private String getMetrics() throws IOException {
|
||||
|
@ -160,4 +165,22 @@ public class TestPrometheusReportingTask {
|
|||
HttpEntity entity = response.getEntity();
|
||||
return EntityUtils.toString(entity);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullLabel() throws IOException, InitializationException {
|
||||
rootGroupStatus.setName(null);
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.onScheduled(configurationContextStub);
|
||||
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
|
||||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
|
||||
String content = getMetrics();
|
||||
Assert.assertTrue(content.contains("parent_id=\"\""));
|
||||
|
||||
try {
|
||||
testedReportingTask.OnStopped();
|
||||
} catch(Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue