diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java index 622726024d..b894beeef3 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java @@ -1,415 +1,434 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.reporting.prometheus.api; - -import java.util.Map; - -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.controller.status.ConnectionStatus; -import org.apache.nifi.controller.status.PortStatus; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; - -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Gauge; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.TransmissionStatus; -import org.apache.nifi.metrics.jvm.JvmMetrics; -import org.apache.nifi.processor.DataUnit; - -public class PrometheusMetricsUtil { - - public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group", - "Send rollup metrics for the entire root process group"); - public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups", - "Send metrics for each process group"); - public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components", - "Send metrics for each component in the system, to include processors, connections, controller services, etc."); - - private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry(); - private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry(); - - // Process Group metrics - private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() - .name("nifi_amount_flowfiles_sent") - .help("Total number of FlowFiles sent by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build() - .name("nifi_amount_flowfiles_transferred") - .help("Total number of FlowFiles transferred by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build() - .name("nifi_amount_flowfiles_received") - .help("Total number of FlowFiles received by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_BYTES_SENT = Gauge.build() - .name("nifi_amount_bytes_sent") - .help("Total number of bytes sent by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_BYTES_READ = Gauge.build() - .name("nifi_amount_bytes_read") - .help("Total number of bytes read by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build() - .name("nifi_amount_bytes_written") - .help("Total number of bytes written by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build() - .name("nifi_amount_bytes_received") - .help("Total number of bytes received by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build() - .name("nifi_amount_bytes_transferred") - .help("Total number of Bytes transferred by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build() - .name("nifi_amount_threads_active") - .help("Total number of threads active for the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") - .register(NIFI_REGISTRY); - - private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build() - .name("nifi_size_content_output_total") - .help("Total size of content output by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build() - .name("nifi_size_content_input_total") - .help("Total size of content input by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build() - .name("nifi_size_content_queued_total") - .help("Total size of content queued in the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build() - .name("nifi_amount_items_output") - .help("Total number of items output by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build() - .name("nifi_amount_items_input") - .help("Total number of items input by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build() - .name("nifi_amount_items_queued") - .help("Total number of items queued by the component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge PROCESSOR_COUNTERS = Gauge.build() - .name("nifi_processor_counters") - .help("Counters exposed by NiFi Processors") - .labelNames("processor_name", "counter_name", "processor_id", "instance") - .register(NIFI_REGISTRY); - - // Connection metrics - private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build() - .name("nifi_backpressure_bytes_threshold") - .help("The number of bytes that can be queued before backpressure is applied") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build() - .name("nifi_backpressure_object_threshold") - .help("The number of flow files that can be queued before backpressure is applied") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build() - .name("nifi_backpressure_enabled") - .help("Whether backpressure has been applied for this component. Values are 0 or 1") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - // Port metrics - private static final Gauge IS_TRANSMITTING = Gauge.build() - .name("nifi_transmitting") - .help("Whether this component is transmitting data. Values are 0 or 1") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status") - .register(NIFI_REGISTRY); - - // Remote Process Group (RPG) metrics - private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build() - .name("nifi_active_remote_port_count") - .help("The number of active remote ports associated with this component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build() - .name("nifi_inactive_remote_port_count") - .help("The number of inactive remote ports associated with this component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build() - .name("nifi_average_lineage_duration") - .help("The average lineage duration (in milliseconds) for all flow file processed by this component") - .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", - "source_id", "source_name", "destination_id", "destination_name") - .register(NIFI_REGISTRY); - - /////////////////////////////////////////////////////////////// - // JVM Metrics - /////////////////////////////////////////////////////////////// - private static final Gauge JVM_HEAP_USED = Gauge.build() - .name("nifi_jvm_heap_used") - .help("NiFi JVM heap used") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_HEAP_USAGE = Gauge.build() - .name("nifi_jvm_heap_usage") - .help("NiFi JVM heap usage") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build() - .name("nifi_jvm_heap_non_usage") - .help("NiFi JVM heap non usage") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_THREAD_COUNT = Gauge.build() - .name("nifi_jvm_thread_count") - .help("NiFi JVM thread count") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build() - .name("nifi_jvm_daemon_thread_count") - .help("NiFi JVM daemon thread count") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_UPTIME = Gauge.build() - .name("nifi_jvm_uptime") - .help("NiFi JVM uptime") - .labelNames("instance") - .register(JVM_REGISTRY); - - private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build() - .name("nifi_jvm_file_descriptor_usage") - .help("NiFi JVM file descriptor usage") - .labelNames("instance") - .register(JVM_REGISTRY); - - public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) { - - final String componentId = status.getId(); - final String componentName = status.getName(); - - AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent()); - AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred()); - AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived()); - - AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent()); - AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead()); - AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten()); - AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived()); - AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred()); - - SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") - .set(status.getOutputContentSize()); - SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") - .set(status.getInputContentSize()); - SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") - .set(status.getQueuedContentSize()); - - AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") - .set(status.getOutputCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") - .set(status.getInputCount()); - AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "") - .set(status.getQueuedCount()); - - AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getActiveThreadCount()); - - // Report metrics for child process groups if specified - if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { - status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy)); - } - - if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { - // Report metrics for all components - for(ProcessorStatus processorStatus : status.getProcessorStatus()) { - Map counters = processorStatus.getCounters(); - - if(counters != null) { - counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS - .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue())); - } - } - for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { - final String connComponentId = connectionStatus.getId(); - final String connComponentName = connectionStatus.getName(); - final String sourceId = connectionStatus.getSourceId(); - final String sourceName = connectionStatus.getSourceName(); - final String destinationId = connectionStatus.getDestinationId(); - final String destinationName = connectionStatus.getDestinationName(); - final String parentId = connectionStatus.getGroupId(); - final String connComponentType = "Connection"; - SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getOutputBytes()); - SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getInputBytes()); - SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getQueuedBytes()); - - AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getOutputCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getInputCount()); - AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getQueuedCount()); - - BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getBackPressureBytesThreshold()); - BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(connectionStatus.getBackPressureObjectThreshold()); - boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount()) - || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes()); - IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) - .set(isBackpressureEnabled ? 1 : 0); - } - for(PortStatus portStatus : status.getInputPortStatus()) { - final String portComponentId = portStatus.getId(); - final String portComponentName = portStatus.getName(); - final String parentId = portStatus.getGroupId(); - final String portComponentType = "InputPort"; - AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); - AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); - - AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); - AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); - AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); - AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); - - AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") - .set(portStatus.getOutputCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") - .set(portStatus.getInputCount()); - - final Boolean isTransmitting = portStatus.isTransmitting(); - IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) - .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); - - AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); - } - for(PortStatus portStatus : status.getOutputPortStatus()) { - final String portComponentId = portStatus.getId(); - final String portComponentName = portStatus.getName(); - final String parentId = portStatus.getGroupId(); - final String portComponentType = "OutputPort"; - AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); - AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived()); - - AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); - AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); - AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); - AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); - - AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") - .set(portStatus.getOutputCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") - .set(portStatus.getInputCount()); - - final Boolean isTransmitting = portStatus.isTransmitting(); - IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) - .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); - - AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); - } - for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { - final String rpgComponentId = remoteProcessGroupStatus.getId(); - final String rpgComponentName = remoteProcessGroupStatus.getName(); - final String parentId = remoteProcessGroupStatus.getGroupId(); - final String rpgComponentType = "RemoteProcessGroup"; - - AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize()); - AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize()); - - AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") - .set(remoteProcessGroupStatus.getSentCount()); - AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") - .set(remoteProcessGroupStatus.getReceivedCount()); - - ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount()); - INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount()); - - AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration()); - - IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name()) - .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0); - - AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount()); - } - } - - return NIFI_REGISTRY; - } - - public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) { - JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B)); - JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage()); - JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage()); - - JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount()); - JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount()); - - JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime()); - JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage()); - - return JVM_REGISTRY; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting.prometheus.api; + +import java.util.Map; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.metrics.jvm.JvmMetrics; +import org.apache.nifi.processor.DataUnit; + +public class PrometheusMetricsUtil { + + public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group", + "Send rollup metrics for the entire root process group"); + public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups", + "Send metrics for each process group"); + public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components", + "Send metrics for each component in the system, to include processors, connections, controller services, etc."); + + private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry(); + private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry(); + + // Process Group metrics + private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() + .name("nifi_amount_flowfiles_sent") + .help("Total number of FlowFiles sent by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build() + .name("nifi_amount_flowfiles_transferred") + .help("Total number of FlowFiles transferred by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build() + .name("nifi_amount_flowfiles_received") + .help("Total number of FlowFiles received by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_BYTES_SENT = Gauge.build() + .name("nifi_amount_bytes_sent") + .help("Total number of bytes sent by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_BYTES_READ = Gauge.build() + .name("nifi_amount_bytes_read") + .help("Total number of bytes read by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build() + .name("nifi_amount_bytes_written") + .help("Total number of bytes written by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build() + .name("nifi_amount_bytes_received") + .help("Total number of bytes received by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build() + .name("nifi_amount_bytes_transferred") + .help("Total number of Bytes transferred by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build() + .name("nifi_amount_threads_active") + .help("Total number of threads active for the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_THREADS_TOTAL_TERMINATED = Gauge.build() + .name("nifi_amount_threads_terminated") + .help("Total number of threads terminated for the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(NIFI_REGISTRY); + + private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build() + .name("nifi_size_content_output_total") + .help("Total size of content output by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build() + .name("nifi_size_content_input_total") + .help("Total size of content input by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build() + .name("nifi_size_content_queued_total") + .help("Total size of content queued in the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build() + .name("nifi_amount_items_output") + .help("Total number of items output by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build() + .name("nifi_amount_items_input") + .help("Total number of items input by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build() + .name("nifi_amount_items_queued") + .help("Total number of items queued by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge PROCESSOR_COUNTERS = Gauge.build() + .name("nifi_processor_counters") + .help("Counters exposed by NiFi Processors") + .labelNames("processor_name", "counter_name", "processor_id", "instance") + .register(NIFI_REGISTRY); + + // Connection metrics + private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build() + .name("nifi_backpressure_bytes_threshold") + .help("The number of bytes that can be queued before backpressure is applied") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build() + .name("nifi_backpressure_object_threshold") + .help("The number of flow files that can be queued before backpressure is applied") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build() + .name("nifi_backpressure_enabled") + .help("Whether backpressure has been applied for this component. Values are 0 or 1") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + // Port metrics + private static final Gauge IS_TRANSMITTING = Gauge.build() + .name("nifi_transmitting") + .help("Whether this component is transmitting data. Values are 0 or 1") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status") + .register(NIFI_REGISTRY); + + // Remote Process Group (RPG) metrics + private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build() + .name("nifi_active_remote_port_count") + .help("The number of active remote ports associated with this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build() + .name("nifi_inactive_remote_port_count") + .help("The number of inactive remote ports associated with this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build() + .name("nifi_average_lineage_duration") + .help("The average lineage duration (in milliseconds) for all flow file processed by this component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", + "source_id", "source_name", "destination_id", "destination_name") + .register(NIFI_REGISTRY); + + /////////////////////////////////////////////////////////////// + // JVM Metrics + /////////////////////////////////////////////////////////////// + private static final Gauge JVM_HEAP_USED = Gauge.build() + .name("nifi_jvm_heap_used") + .help("NiFi JVM heap used") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_HEAP_USAGE = Gauge.build() + .name("nifi_jvm_heap_usage") + .help("NiFi JVM heap usage") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build() + .name("nifi_jvm_heap_non_usage") + .help("NiFi JVM heap non usage") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_THREAD_COUNT = Gauge.build() + .name("nifi_jvm_thread_count") + .help("NiFi JVM thread count") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build() + .name("nifi_jvm_daemon_thread_count") + .help("NiFi JVM daemon thread count") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_UPTIME = Gauge.build() + .name("nifi_jvm_uptime") + .help("NiFi JVM uptime") + .labelNames("instance") + .register(JVM_REGISTRY); + + private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build() + .name("nifi_jvm_file_descriptor_usage") + .help("NiFi JVM file descriptor usage") + .labelNames("instance") + .register(JVM_REGISTRY); + + public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) { + + final String componentId = status.getId(); + final String componentName = status.getName(); + + AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent()); + AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived()); + + AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived()); + AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred()); + + SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getOutputContentSize()); + SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getInputContentSize()); + SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getQueuedContentSize()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") + .set(status.getInputCount()); + AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "") + .set(status.getQueuedCount()); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId) + .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount()); + AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, componentType, componentName, componentId, parentPGId) + .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount()); + + // Report metrics for child process groups if specified + if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { + status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy)); + } + + if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { + // Report metrics for all components + for(ProcessorStatus processorStatus : status.getProcessorStatus()) { + Map counters = processorStatus.getCounters(); + + if(counters != null) { + counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS + .labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue())); + } + + final String procComponentType = "Processor"; + final String procComponentId = processorStatus.getId(); + final String procComponentName = processorStatus.getName(); + final String parentId = processorStatus.getGroupId(); + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) + .set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount()); + AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) + .set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount()); + + } + for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { + final String connComponentId = connectionStatus.getId(); + final String connComponentName = connectionStatus.getName(); + final String sourceId = connectionStatus.getSourceId(); + final String sourceName = connectionStatus.getSourceName(); + final String destinationId = connectionStatus.getDestinationId(); + final String destinationName = connectionStatus.getDestinationName(); + final String parentId = connectionStatus.getGroupId(); + final String connComponentType = "Connection"; + SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getOutputBytes()); + SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getInputBytes()); + SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getQueuedBytes()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getInputCount()); + AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getQueuedCount()); + + BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getBackPressureBytesThreshold()); + BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(connectionStatus.getBackPressureObjectThreshold()); + boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount()) + || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes()); + IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) + .set(isBackpressureEnabled ? 1 : 0); + } + for(PortStatus portStatus : status.getInputPortStatus()) { + final String portComponentId = portStatus.getId(); + final String portComponentName = portStatus.getName(); + final String parentId = portStatus.getGroupId(); + final String portComponentType = "InputPort"; + AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); + + AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getInputCount()); + + final Boolean isTransmitting = portStatus.isTransmitting(); + IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) + .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); + } + for(PortStatus portStatus : status.getOutputPortStatus()) { + final String portComponentId = portStatus.getId(); + final String portComponentName = portStatus.getName(); + final String parentId = portStatus.getGroupId(); + final String portComponentType = "OutputPort"; + AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); + AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId).set(portStatus.getFlowFilesReceived()); + + AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); + AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); + AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getOutputCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") + .set(portStatus.getInputCount()); + + final Boolean isTransmitting = portStatus.isTransmitting(); + IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) + .set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); + } + for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { + final String rpgComponentId = remoteProcessGroupStatus.getId(); + final String rpgComponentName = remoteProcessGroupStatus.getName(); + final String parentId = remoteProcessGroupStatus.getGroupId(); + final String rpgComponentType = "RemoteProcessGroup"; + + AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize()); + AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize()); + + AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") + .set(remoteProcessGroupStatus.getSentCount()); + AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") + .set(remoteProcessGroupStatus.getReceivedCount()); + + ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount()); + INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount()); + + AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getAverageLineageDuration()); + + IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name()) + .set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0); + + AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount()); + } + } + + return NIFI_REGISTRY; + } + + public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) { + JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B)); + JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage()); + JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage()); + + JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount()); + JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount()); + + JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime()); + JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage()); + + return JVM_REGISTRY; + } + +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 618c40ca91..1c3f81065f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -254,7 +254,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "outputCount", status.getOutputCount()); addField(builder, "queuedContentSize", status.getQueuedContentSize()); addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount()); addField(builder, "queuedCount", status.getQueuedCount()); + addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name()); arrayBuilder.add(builder.build()); } @@ -305,6 +307,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "sentContentSize", status.getSentContentSize()); addField(builder, "sentCount", status.getSentCount()); addField(builder, "averageLineageDuration", status.getAverageLineageDuration()); + addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name()); + addField(builder, "targetURI", status.getTargetUri()); arrayBuilder.add(builder.build()); } @@ -329,6 +333,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "inputCount", status.getInputCount()); addField(builder, "outputBytes", status.getOutputBytes()); addField(builder, "outputCount", status.getOutputCount()); + addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name()); + addField(builder, "transmitting", status.isTransmitting()); arrayBuilder.add(builder.build()); } @@ -359,6 +365,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "outputCount", status.getOutputCount()); addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold()); addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold()); + addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold()); addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount()) || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes()))); @@ -390,8 +397,12 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "outputCount", status.getOutputCount()); addField(builder, "outputBytes", status.getOutputBytes()); addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount()); addField(builder, "invocations", status.getInvocations()); addField(builder, "processingNanos", status.getProcessingNanos()); + addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name()); + addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name()); + addField(builder, factory, "counters", status.getCounters()); arrayBuilder.add(builder.build()); } @@ -411,4 +422,28 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "application", applicationName); } + private void addField(final JsonObjectBuilder builder, final String key, final Boolean value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values) { + if (values == null) { + return; + } + + final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); + for (final Map.Entry entry : values.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + + mapBuilder.add(entry.getKey(), entry.getValue()); + } + + builder.add(key, mapBuilder); + } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html index 2d0be38717..ab32bade3f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html @@ -72,13 +72,18 @@ // PG + Processors { "name" : "bytesRead", "type" : ["long", "null"]}, { "name" : "bytesWritten", "type" : ["long", "null"]}, - + { "name" : "terminatedThreadCount", "type" : ["long", "null"]}, + + // Processors + Ports + { "name" : "runStatus", "type" : ["string", "null"]}, + // fields for process group status { "name" : "bytesTransferred", "type" : ["long", "null"]}, { "name" : "flowFilesTransferred", "type" : ["long", "null"]}, { "name" : "inputContentSize", "type" : ["long", "null"]}, { "name" : "outputContentSize", "type" : ["long", "null"]}, { "name" : "queuedContentSize", "type" : ["long", "null"]}, + { "name" : "versionedFlowStatus", "type" : ["string", "null"]}, // fields for remote process groups { "name" : "activeRemotePortCount", "type" : ["long", "null"]}, @@ -88,12 +93,15 @@ { "name" : "sentContentSize", "type" : ["long", "null"]}, { "name" : "sentCount", "type" : ["long", "null"]}, { "name" : "averageLineageDuration", "type" : ["long", "null"]}, + { "name" : "transmissionStatus", "type" : ["string", "null"]}, + { "name" : "targetURI", "type" : ["string", "null"]}, // fields for input/output ports + connections + PG { "name" : "inputBytes", "type" : ["long", "null"]}, { "name" : "inputCount", "type" : ["long", "null"]}, { "name" : "outputBytes", "type" : ["long", "null"]}, { "name" : "outputCount", "type" : ["long", "null"]}, + { "name" : "transmitting", "type" : ["boolean", "null"]}, // fields for connections { "name" : "sourceId", "type" : ["string", "null"]}, @@ -105,6 +113,7 @@ { "name" : "queuedBytes", "type" : ["long", "null"]}, { "name" : "backPressureBytesThreshold", "type" : ["long", "null"]}, { "name" : "backPressureObjectThreshold", "type" : ["long", "null"]}, + { "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]}, { "name" : "isBackPressureEnabled", "type" : ["string", "null"]}, // fields for processors @@ -112,7 +121,9 @@ { "name" : "averageLineageDurationMS", "type" : ["long", "null"]}, { "name" : "flowFilesRemoved", "type" : ["long", "null"]}, { "name" : "invocations", "type" : ["long", "null"]}, - { "name" : "processingNanos", "type" : ["long", "null"]} + { "name" : "processingNanos", "type" : ["long", "null"]}, + { "name" : "executionNode", "type" : ["string", "null"]}, + { "name" : "counters", "type": { "type": "map", "values": "string" }} ] } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc index 6f16d0e91f..4190015353 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc @@ -33,13 +33,18 @@ // PG + Processors { "name" : "bytesRead", "type" : ["long", "null"]}, { "name" : "bytesWritten", "type" : ["long", "null"]}, - + { "name" : "terminatedThreadCount", "type" : ["long", "null"]}, + + // Processors + Ports + { "name" : "runStatus", "type" : ["string", "null"]}, + // fields for process group status { "name" : "bytesTransferred", "type" : ["long", "null"]}, { "name" : "flowFilesTransferred", "type" : ["long", "null"]}, { "name" : "inputContentSize", "type" : ["long", "null"]}, { "name" : "outputContentSize", "type" : ["long", "null"]}, { "name" : "queuedContentSize", "type" : ["long", "null"]}, + { "name" : "versionedFlowStatus", "type" : ["string", "null"]}, // fields for remote process groups { "name" : "activeRemotePortCount", "type" : ["long", "null"]}, @@ -49,12 +54,15 @@ { "name" : "sentContentSize", "type" : ["long", "null"]}, { "name" : "sentCount", "type" : ["long", "null"]}, { "name" : "averageLineageDuration", "type" : ["long", "null"]}, + { "name" : "transmissionStatus", "type" : ["string", "null"]}, + { "name" : "targetURI", "type" : ["string", "null"]}, // fields for input/output ports + connections + PG { "name" : "inputBytes", "type" : ["long", "null"]}, { "name" : "inputCount", "type" : ["long", "null"]}, { "name" : "outputBytes", "type" : ["long", "null"]}, { "name" : "outputCount", "type" : ["long", "null"]}, + { "name" : "transmitting", "type" : ["boolean", "null"]}, // fields for connections { "name" : "sourceId", "type" : ["string", "null"]}, @@ -66,6 +74,7 @@ { "name" : "queuedBytes", "type" : ["long", "null"]}, { "name" : "backPressureBytesThreshold", "type" : ["long", "null"]}, { "name" : "backPressureObjectThreshold", "type" : ["long", "null"]}, + { "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]}, { "name" : "isBackPressureEnabled", "type" : ["string", "null"]}, // fields for processors @@ -73,6 +82,8 @@ { "name" : "averageLineageDurationMS", "type" : ["long", "null"]}, { "name" : "flowFilesRemoved", "type" : ["long", "null"]}, { "name" : "invocations", "type" : ["long", "null"]}, - { "name" : "processingNanos", "type" : ["long", "null"]} + { "name" : "processingNanos", "type" : ["long", "null"]}, + { "name" : "executionNode", "type" : ["string", "null"]}, + { "name" : "counters", "type": { "type": "map", "values": "string" }} ] } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 0d537c2719..46be7763b8 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -19,6 +19,8 @@ package org.apache.nifi.reporting; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -31,6 +33,7 @@ import java.util.Map; import java.util.UUID; import javax.json.Json; +import javax.json.JsonNumber; import javax.json.JsonObject; import javax.json.JsonReader; import javax.json.JsonString; @@ -42,8 +45,11 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; @@ -108,8 +114,13 @@ public class TestSiteToSiteStatusReportingTask { assertEquals(16, task.dataSent.size()); final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); - JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId"); + JsonObject firstElement = jsonReader.readArray().getJsonObject(0); + JsonString componentId = firstElement.getJsonString("componentId"); assertEquals(pgStatus.getId(), componentId.getString()); + JsonNumber terminatedThreads = firstElement.getJsonNumber("terminatedThreadCount"); + assertEquals(1, terminatedThreads.longValue()); + JsonString versionedFlowState = firstElement.getJsonString("versionedFlowState"); + assertEquals("UP_TO_DATE", versionedFlowState.getString()); } @Test @@ -150,6 +161,10 @@ public class TestSiteToSiteStatusReportingTask { JsonString source = object.getJsonString("sourceName"); assertEquals("true", backpressure.getString()); assertEquals("source", source.getString()); + JsonString dataSizeThreshold = object.getJsonString("backPressureDataSizeThreshold"); + JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold"); + assertEquals("1 KB", dataSizeThreshold.getString()); + assertEquals(1024, bytesThreshold.intValue()); } @Test @@ -190,6 +205,80 @@ public class TestSiteToSiteStatusReportingTask { assertEquals("root.1.1.processor.1", componentId.getString()); } + @Test + public void testPortStatus() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonString runStatus = object.getJsonString("runStatus"); + assertEquals(RunStatus.Stopped.name(), runStatus.getString()); + boolean isTransmitting = object.getBoolean("transmitting"); + assertFalse(isTransmitting); + JsonNumber inputBytes = object.getJsonNumber("inputBytes"); + assertEquals(5, inputBytes.intValue()); + } + + @Test + public void testRemoteProcessGroupStatus() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(3, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject firstElement = jsonReader.readArray().getJsonObject(0); + JsonNumber activeThreadCount = firstElement.getJsonNumber("activeThreadCount"); + assertEquals(1L, activeThreadCount.longValue()); + JsonString transmissionStatus = firstElement.getJsonString("transmissionStatus"); + assertEquals("Transmitting", transmissionStatus.getString()); + } + + @Test + public void testProcessorStatus() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonString runStatus = object.getJsonString("runStatus"); + assertEquals(RunStatus.Running.name(), runStatus.getString()); + JsonNumber inputBytes = object.getJsonNumber("inputBytes"); + assertEquals(9, inputBytes.intValue()); + JsonObject counterMap = object.getJsonObject("counters"); + assertNotNull(counterMap); + assertEquals(10, counterMap.getInt("counter1")); + assertEquals(5, counterMap.getInt("counter2")); + } + + /*********************************** + * Test component generator methods + ***********************************/ + public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix, int maxRecursion, int currentDepth) { Collection cStatus = new ArrayList<>(); @@ -229,6 +318,7 @@ public class TestSiteToSiteStatusReportingTask { pgStatus.setProcessGroupStatus(childPgStatus); pgStatus.setRemoteProcessGroupStatus(rpgStatus); pgStatus.setProcessorStatus(pStatus); + pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE); pgStatus.setActiveThreadCount(1); pgStatus.setBytesRead(2L); @@ -246,6 +336,7 @@ public class TestSiteToSiteStatusReportingTask { pgStatus.setOutputCount(13); pgStatus.setQueuedContentSize(14l); pgStatus.setQueuedCount(15); + pgStatus.setTerminatedThreadCount(1); return pgStatus; } @@ -263,6 +354,8 @@ public class TestSiteToSiteStatusReportingTask { pStatus.setInputCount(6); pStatus.setOutputBytes(7l); pStatus.setOutputCount(8); + pStatus.setRunStatus(RunStatus.Stopped); + pStatus.setTransmitting(false); return pStatus; } @@ -287,6 +380,12 @@ public class TestSiteToSiteStatusReportingTask { pStatus.setOutputCount(13); pStatus.setProcessingNanos(14l); pStatus.setType("type"); + pStatus.setTerminatedThreadCount(1); + pStatus.setRunStatus(RunStatus.Running); + pStatus.setCounters(new HashMap() {{ + put("counter1", 10L); + put("counter2", 5L); + }}); return pStatus; } @@ -304,6 +403,7 @@ public class TestSiteToSiteStatusReportingTask { rpgStatus.setSentContentSize(6l); rpgStatus.setSentCount(7); rpgStatus.setTargetUri("uri"); + rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting); return rpgStatus; } @@ -312,7 +412,7 @@ public class TestSiteToSiteStatusReportingTask { ConnectionStatus cStatus = new ConnectionStatus(); cStatus.setId(id); cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); - cStatus.setBackPressureBytesThreshold(0l); + cStatus.setBackPressureDataSizeThreshold("1 KB"); // sets backPressureBytesThreshold too cStatus.setBackPressureObjectThreshold(1l); cStatus.setInputBytes(2l); cStatus.setInputCount(3);