NIFI-6352: Add ability to report all component metrics to Prometheus

This closes #3519.

Signed-off-by: Kevin Doran <kdoran@apache.org>
This commit is contained in:
Matthew Burgess 2019-06-05 12:32:22 -04:00 committed by Kevin Doran
parent aefdc9377c
commit 99d6ed244c
No known key found for this signature in database
GPG Key ID: 5621A6244B77AC02
4 changed files with 296 additions and 81 deletions

View File

@ -40,6 +40,10 @@ import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
+ " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
@ -59,7 +63,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
.name("Prometheus Metrics Endpoint Port")
.name("prometheus-reporting-task-metrics-endpoint-port")
.displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -68,7 +73,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("Instance ID")
.name("prometheus-reporting-task-instance-id")
.displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -76,16 +82,27 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-strategy")
.displayName("Metrics Reporting Strategy")
.description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components")
.allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS)
.defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
.required(true)
.build();
public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("Send JVM-metrics")
.description("Send JVM-metrics in addition to the Nifi-metrics")
.name("prometheus-reporting-task-metrics-send-jvm")
.displayName("Send JVM metrics")
.description("Send JVM metrics in addition to the NiFi metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
@ -93,7 +110,8 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Authentication")
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
@ -107,6 +125,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(METRICS_ENDPOINT_PORT);
props.add(INSTANCE_ID);
props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
@ -165,5 +184,6 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
this.prometheusServer.setReportingContext(context);
this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue());
}
}

View File

@ -54,6 +54,7 @@ public class PrometheusServer {
private Server server;
private ServletContextHandler handler;
private ReportingContext context;
private String metricsStrategy;
private boolean sendJvmMetrics;
private String instanceId;
@ -70,7 +71,8 @@ public class PrometheusServer {
rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus();
ServletOutputStream response = resp.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(response);
nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId);
nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy);
TextFormat.write004(osw, nifiRegistry.metricFamilySamples());
if (PrometheusServer.this.sendJvmMetrics == true) {
@ -167,4 +169,11 @@ public class PrometheusServer {
this.instanceId = iid;
}
public String getMetricsStrategy() {
return metricsStrategy;
}
public void setMetricsStrategy(String metricsStrategy) {
this.metricsStrategy = metricsStrategy;
}
}

View File

@ -17,9 +17,11 @@
package org.apache.nifi.reporting.prometheus.api;
import java.util.Collection;
import java.util.Map;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@ -27,100 +29,116 @@ import com.yammer.metrics.core.VirtualMachineMetrics;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
public class PrometheusMetricsUtil {
public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
"Send rollup metrics for the entire root process group");
public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
"Send metrics for each process group");
public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
"Send metrics for each component in the system, to include processors, connections, controller services, etc.");
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
// Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
.name("nifi_process_group_amount_flowfiles_sent")
.help("Total number of FlowFiles in ProcessGroup sent")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
.name("nifi_process_group_amount_flowfiles_transferred")
.help("Total number of FlowFiles in ProcessGroup transferred")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_flowfiles_transferred")
.help("Total number of FlowFiles transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
.name("nifi_process_group_amount_flowfiles_received")
.help("Total number of FlowFiles in ProcessGroup received")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_flowfiles_received")
.help("Total number of FlowFiles received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
.name("nifi_process_group_amount_bytes_sent")
.help("Total number of Bytes in ProcessGroup sent")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
.name("nifi_process_group_amount_bytes_read")
.help("Total number of Bytes in ProcessGroup read")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
.name("nifi_process_group_amount_bytes_written")
.help("Total number of Bytes in ProcessGroup written")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
.name("nifi_process_group_amount_bytes_received")
.help("Total number of Bytes in ProcessGroup received")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_bytes_received")
.help("Total number of bytes received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
.name("nifi_process_group_amount_bytes_transferred")
.help("Total number of Bytes in ProcessGroup transferred")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_bytes_transferred")
.help("Total number of Bytes transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
.name("nifi_process_group_amount_threads_active")
.help("Total number of threads in ProcessGroup active")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_threads_active")
.help("Total number of threads active for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
.name("nifi_process_group_size_content_output_total")
.help("Total size of content output in ProcessGroup")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_size_content_output_total")
.help("Total size of content output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
.name("nifi_process_group_size_content_input_total")
.help("Total size of content input in ProcessGroup")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_size_content_input_total")
.help("Total size of content input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
.name("nifi_process_group_size_content_queued_total")
.help("Total size of content queued in ProcessGroup")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_size_content_queued_total")
.help("Total size of content queued in the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
.name("nifi_process_group_amount_items_output")
.help("Total amount of items in ProcessGroup output")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_items_output")
.help("Total number of items output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
.name("nifi_process_group_amount_items_input")
.help("Total amount of items in ProcessGroup input")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_items_input")
.help("Total number of items input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
.name("nifi_process_group_amount_items_queued")
.help("Total amount of items in ProcessGroup queued")
.labelNames("instance", "process_group_name", "process_group_id")
.name("nifi_amount_items_queued")
.help("Total number of items queued by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
@ -129,6 +147,60 @@ public class PrometheusMetricsUtil {
.labelNames("processor_name", "counter_name", "processor_id", "instance")
.register(NIFI_REGISTRY);
// Connection metrics
private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
.name("nifi_backpressure_bytes_threshold")
.help("The number of bytes that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
.name("nifi_backpressure_object_threshold")
.help("The number of flow files that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
.name("nifi_backpressure_enabled")
.help("Whether backpressure has been applied for this component. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Port metrics
private static final Gauge IS_TRANSMITTING = Gauge.build()
.name("nifi_transmitting")
.help("Whether this component is transmitting data. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
.register(NIFI_REGISTRY);
// Remote Process Group (RPG) metrics
private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_active_remote_port_count")
.help("The number of active remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_inactive_remote_port_count")
.help("The number of inactive remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
.name("nifi_average_lineage_duration")
.help("The average lineage duration (in milliseconds) for all flow file processed by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
///////////////////////////////////////////////////////////////
// JVM Metrics
///////////////////////////////////////////////////////////////
private static final Gauge JVM_HEAP_USED = Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
@ -171,45 +243,159 @@ public class PrometheusMetricsUtil {
.labelNames("instance")
.register(JVM_REGISTRY);
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId) {
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
final String processGroupId = status.getId();
final String processGroupName = status.getName();
Collection<ProcessorStatus> processorStatus = status.getProcessorStatus();
final String componentId = status.getId();
final String componentName = status.getName();
AMOUNT_FLOWFILES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesSent());
AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesTransferred());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesReceived());
AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, processGroupName, processGroupId).set(status.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, processGroupName, processGroupId).set(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesReceived());
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesTransferred());
AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getOutputContentSize());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getInputContentSize());
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedContentSize());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getOutputContentSize());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getInputContentSize());
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getQueuedContentSize());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, processGroupName, processGroupId).set(status.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, processGroupName, processGroupId).set(status.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedCount());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
.set(status.getQueuedCount());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount());
// Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy));
}
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, processGroupName, processGroupId).set(status.getActiveThreadCount());
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
// Report metrics for all components
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
Map<String, Long> counters = processorStatus.getCounters();
for (ProcessorStatus pstatus : processorStatus) {
Map<String, Long> counters = pstatus.getCounters();
if(counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
}
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = connectionStatus.getId();
final String connComponentName = connectionStatus.getName();
final String sourceId = connectionStatus.getSourceId();
final String sourceName = connectionStatus.getSourceName();
final String destinationId = connectionStatus.getDestinationId();
final String destinationName = connectionStatus.getDestinationName();
final String parentId = connectionStatus.getGroupId();
final String connComponentType = "Connection";
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getOutputBytes());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getInputBytes());
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getQueuedBytes());
if(counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
.labels(pstatus.getName(), entry.getKey(), pstatus.getId(), instanceId).set(entry.getValue()));
AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getQueuedCount());
BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getBackPressureBytesThreshold());
BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getBackPressureObjectThreshold());
boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
|| (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(isBackpressureEnabled ? 1 : 0);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
final String portComponentType = "InputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getInputCount());
final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
final String portComponentType = "OutputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getInputCount());
final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = remoteProcessGroupStatus.getId();
final String rpgComponentName = remoteProcessGroupStatus.getName();
final String parentId = remoteProcessGroupStatus.getGroupId();
final String rpgComponentType = "RemoteProcessGroup";
AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getSentCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getReceivedCount());
ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
.set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
}
}
return NIFI_REGISTRY;
}
public static CollectorRegistry createJvmMetrics(VirtualMachineMetrics jvmMetrics, String instanceId) {

View File

@ -103,9 +103,9 @@ public class TestPrometheusReportingTask {
HttpEntity entity = response.getEntity();
String content = EntityUtils.toString(entity);
Assert.assertEquals(true, content.contains(
"nifi_process_group_amount_flowfiles_received{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
Assert.assertEquals(true, content.contains(
"nifi_process_group_amount_threads_active{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0"));
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
}
}