diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java index e4b556e4be..0f19fe6452 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java @@ -63,4 +63,32 @@ public interface EventAccess { */ List getFlowChanges(int firstActionId, final int maxActions); + /** + * Returns the total number of bytes read by this instance (at the root process group level, i.e. all events) since the instance started + * + * @return the total number of bytes read by this instance + */ + long getTotalBytesRead(); + + /** + * Returns the total number of bytes written by this instance (at the root process group level, i.e. all events) since the instance started + * + * @return the total number of bytes written by this instance + */ + long getTotalBytesWritten(); + + /** + * Returns the total number of bytes sent by this instance (at the root process group level) since the instance started + * + * @return the total number of bytes sent by this instance + */ + long getTotalBytesSent(); + + /** + * Returns the total number of bytes received by this instance (at the root process group level) since the instance started + * + * @return the total number of bytes received by this instance + */ + long getTotalBytesReceived(); + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java index b6cd7ade8b..05a70f8f6f 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java @@ -104,4 +104,19 @@ public class MockEventAccess implements EventAccess { this.flowChanges.add(action); } + public long getTotalBytesRead() { + return -1; + } + + public long getTotalBytesWritten() { + return -1; + } + + public long getTotalBytesSent() { + return -1; + } + + public long getTotalBytesReceived() { + return -1; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java index f6155770d5..2d3b5feba4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.prometheus.util; -import io.prometheus.client.Counter; import io.prometheus.client.Gauge; public class NiFiMetricsRegistry extends AbstractMetricsRegistry { @@ -54,19 +53,25 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry { .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(registry)); + nameToGaugeMap.put("TOTAL_BYTES_SENT", Gauge.build() + .name("nifi_total_bytes_sent") + .help("Running total number of bytes sent by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(registry)); + nameToGaugeMap.put("AMOUNT_BYTES_READ", Gauge.build() .name("nifi_amount_bytes_read") .help("Total number of bytes read by the component") .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(registry)); - nameToCounterMap.put("TOTAL_BYTES_READ", Counter.build().name("nifi_total_bytes_read") - .help("Total number of bytes read by the component") + nameToGaugeMap.put("TOTAL_BYTES_READ", Gauge.build().name("nifi_total_bytes_read") + .help("Running total number of bytes read by the component") .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(registry)); - nameToCounterMap.put("TOTAL_BYTES_WRITTEN", Counter.build().name("nifi_total_bytes_written") - .help("Total number of bytes written by the component") + nameToGaugeMap.put("TOTAL_BYTES_WRITTEN", Gauge.build().name("nifi_total_bytes_written") + .help("Running total number of bytes written by the component") .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(registry)); @@ -82,6 +87,12 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry { .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") .register(registry)); + nameToGaugeMap.put("TOTAL_BYTES_RECEIVED", Gauge.build() + .name("nifi_total_bytes_received") + .help("Running total number of bytes received by the component") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(registry)); + nameToGaugeMap.put("AMOUNT_BYTES_TRANSFERRED", Gauge.build() .name("nifi_amount_bytes_transferred") .help("Total number of Bytes transferred by the component") diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index b35b2554d2..e6bdf38e44 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -123,8 +123,6 @@ public class PrometheusMetricsUtil { nifiMetricsRegistry.setDataPoint(status.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId); - nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId); - nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getBytesTransferred(), "AMOUNT_BYTES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId); @@ -173,8 +171,6 @@ public class PrometheusMetricsUtil { nifiMetricsRegistry.setDataPoint(processorStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, procComponentType, procComponentName, procComponentId, parentId); nifiMetricsRegistry.setDataPoint(processorStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL", @@ -245,8 +241,6 @@ public class PrometheusMetricsUtil { nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT", @@ -271,8 +265,6 @@ public class PrometheusMetricsUtil { nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId); - nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId); nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT", diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java index be26cd86d7..d3a04999f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java @@ -51,4 +51,10 @@ public interface FlowFileEventRepository extends Closeable { * @param componentIdentifier Identifier of the component */ void purgeTransferEvents(String componentIdentifier); + + /** + * Reports aggregate metrics for all flowfile events + * @return a report of processing activity + */ + FlowFileEvent reportAggregateEvent(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index a8892fb9d6..fb9e8b1830 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3003,6 +3003,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node return result; } + public FlowFileEventRepository getFlowFileEventRepository() { + return flowFileEventRepository; + } + private static class HeartbeatBean { private final ProcessGroup rootGroup; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java index 774ced4131..f2640aaa21 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; public class RingBufferEventRepository implements FlowFileEventRepository { private final int numMinutes; + private final EventSumValue aggregateValues = new EventSumValue(0L); private final ConcurrentMap componentEventMap = new ConcurrentHashMap<>(); public RingBufferEventRepository(final int numMinutes) { @@ -40,6 +41,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository { public void updateRepository(final FlowFileEvent event, final String componentId) { final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes)); eventContainer.addEvent(event); + aggregateValues.add(event); } @Override @@ -61,4 +63,8 @@ public class RingBufferEventRepository implements FlowFileEventRepository { componentEventMap.remove(componentIdentifier); } + @Override + public FlowFileEvent reportAggregateEvent() { + return aggregateValues.toFlowFileEvent(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index 9e2ab49881..483c77612a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -744,4 +744,44 @@ public class StandardEventAccess implements UserAwareEventAccess { return status; } + + /** + * Returns the total number of bytes read by this instance (at the root process group level, i.e. all events) since the instance started + * + * @return the total number of bytes read by this instance + */ + @Override + public long getTotalBytesRead() { + return flowFileEventRepository.reportAggregateEvent().getBytesRead(); + } + + /** + * Returns the total number of bytes written by this instance (at the root process group level, i.e. all events) since the instance started + * + * @return the total number of bytes written by this instance + */ + @Override + public long getTotalBytesWritten() { + return flowFileEventRepository.reportAggregateEvent().getBytesWritten(); + } + + /** + * Returns the total number of bytes sent by this instance (at the root process group level) since the instance started + * + * @return the total number of bytes sent by this instance + */ + @Override + public long getTotalBytesSent() { + return flowFileEventRepository.reportAggregateEvent().getBytesSent(); + } + + /** + * Returns the total number of bytes received by this instance (at the root process group level) since the instance started + * + * @return the total number of bytes received by this instance + */ + @Override + public long getTotalBytesReceived() { + return flowFileEventRepository.reportAggregateEvent().getBytesReceived(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4fce249da9..929580d640 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -81,6 +81,8 @@ import org.apache.nifi.controller.Template; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.controller.repository.FlowFileEvent; +import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; @@ -152,6 +154,7 @@ import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.FlowDifferenceFilters; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; import org.apache.nifi.web.api.dto.AffectedComponentDTO; @@ -5376,10 +5379,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public Collection generateFlowMetrics() { - String instanceId = controllerFacade.getInstanceId(); + final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId(); ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root"); PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup", PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue()); + + // Add the total byte counts (read/written) to the NiFi metrics registry + FlowFileEventRepository flowFileEventRepository = controllerFacade.getFlowFileEventRepository(); + final String rootPGId = StringUtils.isEmpty(rootPGStatus.getId()) ? "" : rootPGStatus.getId(); + final String rootPGName = StringUtils.isEmpty(rootPGStatus.getName()) ? "" : rootPGStatus.getName(); + final FlowFileEvent aggregateEvent = flowFileEventRepository.reportAggregateEvent(); + nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesRead(), "TOTAL_BYTES_READ", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesWritten(), "TOTAL_BYTES_WRITTEN", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesSent(), "TOTAL_BYTES_SENT", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId); // Get Connection Status Analytics (predictions, e.g.) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 82f5a23aa4..be059891f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -48,6 +48,7 @@ import org.apache.nifi.controller.Template; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.repository.ContentNotFoundException; +import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -1645,6 +1646,10 @@ public class ControllerFacade implements Authorizable { return dtoFactory.createProcessorDiagnosticsDto(processor, processorStatus, bulletinRepository, flowController, serviceEntityFactory); } + public FlowFileEventRepository getFlowFileEventRepository() { + return flowController.getFlowFileEventRepository(); + } + /* * setters */ diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java index 774d036864..acf112885f 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java @@ -33,10 +33,12 @@ import org.apache.nifi.prometheus.util.JvmMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; import org.eclipse.jetty.server.Server; import java.net.InetSocketAddress; @@ -130,11 +132,28 @@ public class PrometheusReportingTask extends AbstractReportingTask { this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want); } Function nifiMetrics = (reportingContext) -> { - ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus(); + EventAccess eventAccess = reportingContext.getEventAccess(); + ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus(); String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue(); + if (instanceId == null) { + instanceId = ""; + } String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue(); NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry(); - return PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy); + CollectorRegistry collectorRegistry = PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy); + // Add the total byte counts (read/written) to the NiFi metrics registry + final String rootPGId = StringUtils.isEmpty(rootGroupStatus.getId()) ? "" : rootGroupStatus.getId(); + final String rootPGName = StringUtils.isEmpty(rootGroupStatus.getName()) ? "" : rootGroupStatus.getName(); + nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesRead(), "TOTAL_BYTES_READ", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesWritten(), "TOTAL_BYTES_WRITTEN", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesSent(), "TOTAL_BYTES_SENT", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesReceived(), "TOTAL_BYTES_RECEIVED", + instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + + return collectorRegistry; }; metricsCollectors.add(nifiMetrics); if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {