diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java index daaddd3a84..7dcfe739f7 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java @@ -40,6 +40,10 @@ import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.eclipse.jetty.server.Server; +import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS; +import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG; +import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT; + @Tags({ "reporting", "prometheus", "metrics", "time series data" }) @CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application." + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance") @@ -59,7 +63,8 @@ public class PrometheusReportingTask extends AbstractReportingTask { + "specified in the SSL Context Service"); public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder() - .name("Prometheus Metrics Endpoint Port") + .name("prometheus-reporting-task-metrics-endpoint-port") + .displayName("Prometheus Metrics Endpoint Port") .description("The Port where prometheus metrics can be accessed") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) @@ -68,7 +73,8 @@ public class PrometheusReportingTask extends AbstractReportingTask { .build(); public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder() - .name("Instance ID") + .name("prometheus-reporting-task-instance-id") + .displayName("Instance ID") .description("Id of this NiFi instance to be included in the metrics sent to Prometheus") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) @@ -76,16 +82,27 @@ public class PrometheusReportingTask extends AbstractReportingTask { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder() + .name("prometheus-reporting-task-metrics-strategy") + .displayName("Metrics Reporting Strategy") + .description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components") + .allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS) + .defaultValue(METRICS_STRATEGY_COMPONENTS.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder() - .name("Send JVM-metrics") - .description("Send JVM-metrics in addition to the Nifi-metrics") + .name("prometheus-reporting-task-metrics-send-jvm") + .displayName("Send JVM metrics") + .description("Send JVM metrics in addition to the NiFi metrics") .allowableValues("true", "false") .defaultValue("false") .required(true) .build(); public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() - .name("SSL Context Service") + .name("prometheus-reporting-task-ssl-context") + .displayName("SSL Context Service") .description("The SSL Context Service to use in order to secure the server. If specified, the server will" + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests") .required(false) @@ -93,7 +110,8 @@ public class PrometheusReportingTask extends AbstractReportingTask { .build(); public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("Client Authentication") + .name("prometheus-reporting-task-client-auth") + .displayName("Client Authentication") .description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the " + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") .required(true) @@ -107,6 +125,7 @@ public class PrometheusReportingTask extends AbstractReportingTask { List props = new ArrayList<>(); props.add(METRICS_ENDPOINT_PORT); props.add(INSTANCE_ID); + props.add(METRICS_STRATEGY); props.add(SEND_JVM_METRICS); props.add(SSL_CONTEXT); props.add(CLIENT_AUTH); @@ -165,5 +184,6 @@ public class PrometheusReportingTask extends AbstractReportingTask { @Override public void onTrigger(final ReportingContext context) { this.prometheusServer.setReportingContext(context); + this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue()); } } diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java index acaa447631..76ef1263b6 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java @@ -54,6 +54,7 @@ public class PrometheusServer { private Server server; private ServletContextHandler handler; private ReportingContext context; + private String metricsStrategy; private boolean sendJvmMetrics; private String instanceId; @@ -70,7 +71,8 @@ public class PrometheusServer { rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus(); ServletOutputStream response = resp.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(response); - nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId); + + nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy); TextFormat.write004(osw, nifiRegistry.metricFamilySamples()); if (PrometheusServer.this.sendJvmMetrics == true) { @@ -167,4 +169,11 @@ public class PrometheusServer { this.instanceId = iid; } + public String getMetricsStrategy() { + return metricsStrategy; + } + + public void setMetricsStrategy(String metricsStrategy) { + this.metricsStrategy = metricsStrategy; + } } diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java index ebd010282c..f4fa64250b 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java @@ -17,9 +17,11 @@ package org.apache.nifi.reporting.prometheus.api; -import java.util.Collection; import java.util.Map; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; @@ -27,100 +29,116 @@ import com.yammer.metrics.core.VirtualMachineMetrics; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.TransmissionStatus; public class PrometheusMetricsUtil { + + public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group", + "Send rollup metrics for the entire root process group"); + public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups", + "Send metrics for each process group"); + public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components", + "Send metrics for each component in the system, to include processors, connections, controller services, etc."); + private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry(); private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry(); + // Process Group metrics private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() - .name("nifi_process_group_amount_flowfiles_sent") - .help("Total number of FlowFiles in ProcessGroup sent") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_flowfiles_sent") + .help("Total number of FlowFiles sent by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build() - .name("nifi_process_group_amount_flowfiles_transferred") - .help("Total number of FlowFiles in ProcessGroup transferred") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_flowfiles_transferred") + .help("Total number of FlowFiles transferred by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build() - .name("nifi_process_group_amount_flowfiles_received") - .help("Total number of FlowFiles in ProcessGroup received") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_flowfiles_received") + .help("Total number of FlowFiles received by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_BYTES_SENT = Gauge.build() - .name("nifi_process_group_amount_bytes_sent") - .help("Total number of Bytes in ProcessGroup sent") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_bytes_sent") + .help("Total number of bytes sent by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_BYTES_READ = Gauge.build() - .name("nifi_process_group_amount_bytes_read") - .help("Total number of Bytes in ProcessGroup read") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_bytes_read") + .help("Total number of bytes read by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build() - .name("nifi_process_group_amount_bytes_written") - .help("Total number of Bytes in ProcessGroup written") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_bytes_written") + .help("Total number of bytes written by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build() - .name("nifi_process_group_amount_bytes_received") - .help("Total number of Bytes in ProcessGroup received") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_bytes_received") + .help("Total number of bytes received by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build() - .name("nifi_process_group_amount_bytes_transferred") - .help("Total number of Bytes in ProcessGroup transferred") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_bytes_transferred") + .help("Total number of Bytes transferred by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); - private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build() - .name("nifi_process_group_amount_threads_active") - .help("Total number of threads in ProcessGroup active") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_threads_active") + .help("Total number of threads active for the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(NIFI_REGISTRY); private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build() - .name("nifi_process_group_size_content_output_total") - .help("Total size of content output in ProcessGroup") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_size_content_output_total") + .help("Total size of content output by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build() - .name("nifi_process_group_size_content_input_total") - .help("Total size of content input in ProcessGroup") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_size_content_input_total") + .help("Total size of content input by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build() - .name("nifi_process_group_size_content_queued_total") - .help("Total size of content queued in ProcessGroup") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_size_content_queued_total") + .help("Total size of content queued in the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build() - .name("nifi_process_group_amount_items_output") - .help("Total amount of items in ProcessGroup output") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_items_output") + .help("Total number of items output by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build() - .name("nifi_process_group_amount_items_input") - .help("Total amount of items in ProcessGroup input") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_items_input") + .help("Total number of items input by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build() - .name("nifi_process_group_amount_items_queued") - .help("Total amount of items in ProcessGroup queued") - .labelNames("instance", "process_group_name", "process_group_id") + .name("nifi_amount_items_queued") + .help("Total number of items queued by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") .register(NIFI_REGISTRY); private static final Gauge PROCESSOR_COUNTERS = Gauge.build() @@ -129,6 +147,60 @@ public class PrometheusMetricsUtil { .labelNames("processor_name", "counter_name", "processor_id", "instance") .register(NIFI_REGISTRY); + // Connection metrics + private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build() + .name("nifi_backpressure_bytes_threshold") + .help("The number of bytes that can be queued before backpressure is applied") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build() + .name("nifi_backpressure_object_threshold") + .help("The number of flow files that can be queued before backpressure is applied") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build() + .name("nifi_backpressure_enabled") + .help("Whether backpressure has been applied for this component. Values are 0 or 1") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + // Port metrics + private static final Gauge IS_TRANSMITTING = Gauge.build() + .name("nifi_transmitting") + .help("Whether this component is transmitting data. Values are 0 or 1") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status") + .register(NIFI_REGISTRY); + + // Remote Process Group (RPG) metrics + private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build() + .name("nifi_active_remote_port_count") + .help("The number of active remote ports associated with this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build() + .name("nifi_inactive_remote_port_count") + .help("The number of inactive remote ports associated with this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build() + .name("nifi_average_lineage_duration") + .help("The average lineage duration (in milliseconds) for all flow file processed by this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + /////////////////////////////////////////////////////////////// + // JVM Metrics + /////////////////////////////////////////////////////////////// private static final Gauge JVM_HEAP_USED = Gauge.build() .name("nifi_jvm_heap_used") .help("NiFi JVM heap used") @@ -171,45 +243,159 @@ public class PrometheusMetricsUtil { .labelNames("instance") .register(JVM_REGISTRY); - public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId) { + public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) { - final String processGroupId = status.getId(); - final String processGroupName = status.getName(); - Collection processorStatus = status.getProcessorStatus(); + final String componentId = status.getId(); + final String componentName = status.getName(); - AMOUNT_FLOWFILES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesSent()); - AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesTransferred()); - AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getFlowFilesReceived()); + AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent()); + AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived()); - AMOUNT_BYTES_SENT.labels(instanceId, processGroupName, processGroupId).set(status.getBytesSent()); - AMOUNT_BYTES_READ.labels(instanceId, processGroupName, processGroupId).set(status.getBytesRead()); - AMOUNT_BYTES_WRITTEN.labels(instanceId, processGroupName, processGroupId).set(status.getBytesWritten()); - AMOUNT_BYTES_RECEIVED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesReceived()); - AMOUNT_BYTES_TRANSFERRED.labels(instanceId, processGroupName, processGroupId).set(status.getBytesTransferred()); + AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived()); + AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred()); - SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getOutputContentSize()); - SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getInputContentSize()); - SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedContentSize()); + SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getOutputContentSize()); + SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getInputContentSize()); + SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getQueuedContentSize()); - AMOUNT_ITEMS_OUTPUT.labels(instanceId, processGroupName, processGroupId).set(status.getOutputCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, processGroupName, processGroupId).set(status.getInputCount()); - AMOUNT_ITEMS_QUEUED.labels(instanceId, processGroupName, processGroupId).set(status.getQueuedCount()); + AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getInputCount()); + AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "") + .set(status.getQueuedCount()); + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount()); + // Report metrics for child process groups if specified + if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { + status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy)); + } - AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, processGroupName, processGroupId).set(status.getActiveThreadCount()); + if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { + // Report metrics for all components + for(ProcessorStatus processorStatus : status.getProcessorStatus()) { + Map counters = processorStatus.getCounters(); - for (ProcessorStatus pstatus : processorStatus) { - Map counters = pstatus.getCounters(); + if(counters != null) { + counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS + .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue())); + } + } + for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { + final String connComponentId = connectionStatus.getId(); + final String connComponentName = connectionStatus.getName(); + final String sourceId = connectionStatus.getSourceId(); + final String sourceName = connectionStatus.getSourceName(); + final String destinationId = connectionStatus.getDestinationId(); + final String destinationName = connectionStatus.getDestinationName(); + final String parentId = connectionStatus.getGroupId(); + final String connComponentType = "Connection"; + SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getOutputBytes()); + SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getInputBytes()); + SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getQueuedBytes()); - if(counters != null) { - counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS - .labels(pstatus.getName(), entry.getKey(), pstatus.getId(), instanceId).set(entry.getValue())); + AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getInputCount()); + AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getQueuedCount()); + + BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getBackPressureBytesThreshold()); + BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getBackPressureObjectThreshold()); + boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount()) + || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes()); + IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(isBackpressureEnabled ? 1 : 0); + } + for(PortStatus portStatus : status.getInputPortStatus()) { + final String portComponentId = portStatus.getId(); + final String portComponentName = portStatus.getName(); + final String parentId = portStatus.getGroupId(); + final String portComponentType = "InputPort"; + AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); + + AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getInputCount()); + + final Boolean isTransmitting = portStatus.isTransmitting(); + IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) + .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); + } + for(PortStatus portStatus : status.getOutputPortStatus()) { + final String portComponentId = portStatus.getId(); + final String portComponentName = portStatus.getName(); + final String parentId = portStatus.getGroupId(); + final String portComponentType = "OutputPort"; + AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived()); + + AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getInputCount()); + + final Boolean isTransmitting = portStatus.isTransmitting(); + IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) + .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); + } + for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { + final String rpgComponentId = remoteProcessGroupStatus.getId(); + final String rpgComponentName = remoteProcessGroupStatus.getName(); + final String parentId = remoteProcessGroupStatus.getGroupId(); + final String rpgComponentType = "RemoteProcessGroup"; + + AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") + .set(remoteProcessGroupStatus.getSentCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") + .set(remoteProcessGroupStatus.getReceivedCount()); + + ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount()); + INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount()); + + AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration()); + + IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name()) + .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount()); } } return NIFI_REGISTRY; - } public static CollectorRegistry createJvmMetrics(VirtualMachineMetrics jvmMetrics, String instanceId) { diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java index bba7a8b181..8cb18b254c 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java @@ -103,9 +103,9 @@ public class TestPrometheusReportingTask { HttpEntity entity = response.getEntity(); String content = EntityUtils.toString(entity); Assert.assertEquals(true, content.contains( - "nifi_process_group_amount_flowfiles_received{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0")); + "nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0")); Assert.assertEquals(true, content.contains( - "nifi_process_group_amount_threads_active{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",} 5.0")); + "nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0")); } }