From d2aec89738409c731328b9a1e5faad626c0b8e65 Mon Sep 17 00:00:00 2001 From: timeabarna <38653567+timeabarna@users.noreply.github.com> Date: Tue, 10 Oct 2023 20:55:33 +0200 Subject: [PATCH] NIFI-12166 Add repo usage to the monitoring endpoint (#7836) * NIFI-12166 Add repo usage to the monitoring endpoint --- .../apache/nifi/diagnostics/StorageUsage.java | 0 .../apache/nifi/reporting/EventAccess.java | 21 +++++ .../org/apache/nifi/util/MockEventAccess.java | 18 ++++ .../prometheus/util/NiFiMetricsRegistry.java | 45 ++++++++++ .../util/PrometheusMetricsUtil.java | 32 +++++++ .../nifi/controller/FlowController.java | 3 +- .../nifi/reporting/StandardEventAccess.java | 90 ++++++++++++++++++- .../nifi/web/StandardNiFiServiceFacade.java | 9 ++ .../nifi/web/controller/ControllerFacade.java | 25 ++++++ .../prometheus/TestPrometheusMetricsUtil.java | 58 ++++++++++++ .../reporting/StatelessEventAccess.java | 17 ++++ 11 files changed, 316 insertions(+), 2 deletions(-) rename {nifi-framework-api => nifi-api}/src/main/java/org/apache/nifi/diagnostics/StorageUsage.java (100%) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StorageUsage.java b/nifi-api/src/main/java/org/apache/nifi/diagnostics/StorageUsage.java similarity index 100% rename from nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StorageUsage.java rename to nifi-api/src/main/java/org/apache/nifi/diagnostics/StorageUsage.java diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java index 0f19fe6452..43ef613d95 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java @@ -18,8 +18,11 @@ package org.apache.nifi.reporting; import java.io.IOException; import java.util.List; +import java.util.Map; + import org.apache.nifi.action.Action; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; @@ -91,4 +94,22 @@ public interface EventAccess { */ long getTotalBytesReceived(); + /** + * Returns the storage usage of all provenance repositories + * @return the map of all the storage usage + */ + Map getProvenanceRepositoryStorageUsage(); + + /** + * Returns the storage usage of all content repositories + * @return the map of all the storage usage + */ + Map getContentRepositoryStorageUsage(); + + /** + * Returns the storage usage of the flow file repository + * @return the storage usage + */ + StorageUsage getFlowFileRepositoryStorageUsage(); + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java index 05a70f8f6f..4fcdf05bef 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java @@ -18,11 +18,14 @@ package org.apache.nifi.util; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.nifi.action.Action; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.reporting.EventAccess; @@ -119,4 +122,19 @@ public class MockEventAccess implements EventAccess { public long getTotalBytesReceived() { return -1; } + + @Override + public Map getProvenanceRepositoryStorageUsage() { + return Collections.emptyMap(); + } + + @Override + public Map getContentRepositoryStorageUsage() { + return Collections.emptyMap(); + } + + @Override + public StorageUsage getFlowFileRepositoryStorageUsage() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java index 97eca61a8c..cc3ff65eaa 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java @@ -230,5 +230,50 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry { .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "source_id", "source_name", "destination_id", "destination_name") .register(registry)); + + nameToGaugeMap.put("FLOW_FILE_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_total_space_bytes") + .help("Flowfile repository total space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("FLOW_FILE_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_used_space_bytes") + .help("Flowfile repository used space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("FLOW_FILE_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_free_space_bytes") + .help("Flowfile repository free space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("CONTENT_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_content_repo_total_space_bytes") + .help("Content repository total space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("CONTENT_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_content_repo_used_space_bytes") + .help("Content repository used space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("CONTENT_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_content_repo_free_space_bytes") + .help("Content repository free space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("PROVENANCE_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_total_space_bytes") + .help("Provenance repository total space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("PROVENANCE_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_used_space_bytes") + .help("Provenance repository used space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); + + nameToGaugeMap.put("PROVENANCE_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_free_space_bytes") + .help("Provenance repository free space in bytes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier") + .register(registry)); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index 998ee2cf15..c5ac134f73 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.metrics.jvm.JvmMetrics; import org.apache.nifi.processor.DataUnit; @@ -523,4 +524,35 @@ public class PrometheusMetricsUtil { return clusterMetricsRegistry.getRegistry(); } + + public static CollectorRegistry createStorageUsageMetrics(final NiFiMetricsRegistry nifiMetricsRegistry, final StorageUsage flowFileRepositoryUsage, + final Map contentRepositoryUsage, final Map provenanceRepositoryUsage, + final String instanceId, final String componentType, final String componentName, final String componentId, final String parentId) { + + addStorageUsageMetric(nifiMetricsRegistry, flowFileRepositoryUsage, instanceId, componentType, componentName, componentId, parentId, + "FLOW_FILE_REPO_TOTAL_SPACE_BYTES", "FLOW_FILE_REPO_FREE_SPACE_BYTES", "FLOW_FILE_REPO_USED_SPACE_BYTES"); + + for (final StorageUsage usage : contentRepositoryUsage.values()) { + addStorageUsageMetric(nifiMetricsRegistry, usage, instanceId, componentType, componentName, componentId, parentId, + "CONTENT_REPO_TOTAL_SPACE_BYTES", "CONTENT_REPO_FREE_SPACE_BYTES", "CONTENT_REPO_USED_SPACE_BYTES"); + } + + for (final StorageUsage usage : provenanceRepositoryUsage.values()) { + addStorageUsageMetric(nifiMetricsRegistry, usage, instanceId, componentType, componentName, componentId, parentId, + "PROVENANCE_REPO_TOTAL_SPACE_BYTES", "PROVENANCE_REPO_FREE_SPACE_BYTES", "PROVENANCE_REPO_USED_SPACE_BYTES"); + } + + return nifiMetricsRegistry.getRegistry(); + } + + private static void addStorageUsageMetric(final NiFiMetricsRegistry nifiMetricsRegistry, final StorageUsage storageUsage, final String instanceId, + final String componentType, final String componentName, final String componentId, final String parentId, + final String totalSpaceLabel, final String freeSpaceLabel, final String usedSpaceLabel) { + nifiMetricsRegistry.setDataPoint(storageUsage.getTotalSpace(), totalSpaceLabel, + instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier()); + nifiMetricsRegistry.setDataPoint(storageUsage.getFreeSpace(), freeSpaceLabel, + instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier()); + nifiMetricsRegistry.setDataPoint(storageUsage.getUsedSpace(), usedSpaceLabel, + instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier()); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 48d738201a..5fee9763a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -756,7 +756,8 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } - eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, auditService, analyticsEngine); + eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, + auditService, analyticsEngine, flowFileRepository, contentRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index a38f49475e..4d1abc0e42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -25,19 +25,26 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.FlowFileEvent; import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.history.History; import org.apache.nifi.provenance.ProvenanceRepository; +import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Predicate; public class StandardEventAccess extends AbstractEventAccess implements UserAwareEventAccess { @@ -46,15 +53,20 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar private final Authorizer authorizer; private final ProvenanceRepository provenanceRepository; private final AuditService auditService; + private final FlowFileRepository flowFileRepository; + private final ContentRepository contentRepository; public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRepository flowFileEventRepository, final ProcessScheduler processScheduler, - final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine) { + final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine, + final FlowFileRepository flowFileRepository, final ContentRepository contentRepository) { super(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository); this.flowFileEventRepository = flowFileEventRepository; this.flowManager = flowManager; this.authorizer = authorizer; this.provenanceRepository = provenanceRepository; this.auditService = auditService; + this.flowFileRepository = flowFileRepository; + this.contentRepository = contentRepository; } @@ -146,4 +158,80 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar // on demand status request for a specific user... require authorization per component and filter results as appropriate return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), recursiveStatusDepth, 1, false); } + + /** + * Returns the storage usage of all provenance repositories + * @return the map of all the storage usage + */ + @Override + public Map getProvenanceRepositoryStorageUsage() { + final Set provContainerNames = provenanceRepository.getContainerNames(); + final Map provRepositoryUsage = new LinkedHashMap<>(provContainerNames.size()); + for (final String containerName : provContainerNames) { + long containerFree; + long containerCapacity; + + try { + containerFree = provenanceRepository.getContainerUsableSpace(containerName); + containerCapacity = provenanceRepository.getContainerCapacity(containerName); + } catch (final IOException ioe) { + containerFree = 0L; + containerCapacity = -1L; + } + + final StorageUsage storageUsage = new StorageUsage(); + storageUsage.setIdentifier(containerName); + storageUsage.setFreeSpace(containerFree); + storageUsage.setTotalSpace(containerCapacity); + provRepositoryUsage.put(containerName, storageUsage); + } + return provRepositoryUsage; + } + + /** + * Returns the storage usage of all content repositories + * @return the map of all the storage usage + */ + @Override + public Map getContentRepositoryStorageUsage() { + final Set containerNames = contentRepository.getContainerNames(); + final Map fileRepositoryUsage = new LinkedHashMap<>(containerNames.size()); + for (final String containerName : containerNames) { + long containerFree; + long containerCapacity; + + try { + containerFree = contentRepository.getContainerUsableSpace(containerName); + containerCapacity = contentRepository.getContainerCapacity(containerName); + } catch (final IOException ioe) { + containerFree = 0L; + containerCapacity = -1L; + } + + final StorageUsage storageUsage = new StorageUsage(); + storageUsage.setIdentifier(containerName); + storageUsage.setFreeSpace(containerFree); + storageUsage.setTotalSpace(containerCapacity); + fileRepositoryUsage.put(containerName, storageUsage); + } + return fileRepositoryUsage; + } + + /** + * Returns the storage usage of the flow file repository + * @return the storage usage + */ + @Override + public StorageUsage getFlowFileRepositoryStorageUsage() { + final StorageUsage flowFileRepoStorageUsage = new StorageUsage(); + flowFileRepoStorageUsage.setIdentifier("FlowFile Repository"); + try { + flowFileRepoStorageUsage.setFreeSpace(flowFileRepository.getUsableStorageSpace()); + flowFileRepoStorageUsage.setTotalSpace(flowFileRepository.getStorageCapacity()); + } catch (final IOException ioe) { + flowFileRepoStorageUsage.setFreeSpace(0L); + flowFileRepoStorageUsage.setTotalSpace(-1L); + } + return flowFileRepoStorageUsage; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index eba0110932..774265236f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -101,6 +101,7 @@ import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; import org.apache.nifi.diagnostics.DiagnosticLevel; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -6128,6 +6129,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED", instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); + //Add flow file repository, content repository and provenance repository usage to NiFi metrics + final StorageUsage flowFileRepositoryUsage = controllerFacade.getFlowFileRepositoryStorageUsage(); + final Map contentRepositoryUsage = controllerFacade.getContentRepositoryStorageUsage(); + final Map provenanceRepositoryUsage = controllerFacade.getProvenanceRepositoryStorageUsage(); + + PrometheusMetricsUtil.createStorageUsageMetrics(nifiMetricsRegistry, flowFileRepositoryUsage, contentRepositoryUsage, provenanceRepositoryUsage, + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); + //Add total task duration for root to the NiFi metrics registry // The latest aggregated status history is the last element in the list so we need the last element only final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 2afa7cb7d8..1b1dc3e1b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -62,6 +62,7 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.controller.status.history.StatusHistoryRepository; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.flowanalysis.FlowAnalysisRule; import org.apache.nifi.flow.VersionedProcessGroup; @@ -1735,6 +1736,30 @@ public class ControllerFacade implements Authorizable { return flowController.getFlowFileEventRepository(); } + /** + * Returns the storage usage of all provenance repositories + * @return the map of all the storage usage + */ + public Map getProvenanceRepositoryStorageUsage() { + return flowController.getEventAccess().getProvenanceRepositoryStorageUsage(); + } + + /** + * Returns the storage usage of all content repositories + * @return the map of all the storage usage + */ + public Map getContentRepositoryStorageUsage() { + return flowController.getEventAccess().getContentRepositoryStorageUsage(); + } + + /** + * Returns the storage usage of the flow file repository + * @return the storage usage + */ + public StorageUsage getFlowFileRepositoryStorageUsage() { + return flowController.getEventAccess().getFlowFileRepositoryStorageUsage(); + } + /* * setters */ diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java index c0899cef63..281311e3c6 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java @@ -18,6 +18,7 @@ package org.apache.nifi.reporting.prometheus; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.prometheus.util.AbstractMetricsRegistry; import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; @@ -65,6 +66,13 @@ public class TestPrometheusMetricsUtil { private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis"; private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis"; + private static final String METRIC_NAME_SEGMENT_FOR_REPOSITORIES = "repo"; + private static final String LABEL_NAME_FOR_REPO_IDENTIFIER = "repo_identifier"; + private static final String FLOW_FILE_REPO_IDENTIFIER = "flowFileRepo"; + private static final String CONTENT_REPO_IDENTIFIER_ONE = "contentRepo1"; + private static final String CONTENT_REPO_IDENTIFIER_TWO = "contentRepo2"; + private static final String PROVENANCE_REPO_IDENTIFIER = "provenanceRepo"; + private static ProcessGroupStatus singleProcessGroupStatus; private static ProcessGroupStatus nestedProcessGroupStatus; private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure; @@ -304,6 +312,21 @@ public class TestPrometheusMetricsUtil { assertThat(sampleValues, hasItems(EXPECTED_NESTED_BYTES_PERCENT_VALUE, EXPECTED_NESTED_COUNT_PERCENT_VALUE)); } + @Test + public void testStorageUsageAddedToNifiMetrics() { + final NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry(); + final StorageUsage floeFileRepositoryUsage = createFloFileRepositoryUsage(); + final Map contentRepositoryUsage = createContentRepositoryUsage(); + final Map provenanceRepositoryUsage = createProvenanceRepositoryUsage(); + + PrometheusMetricsUtil.createStorageUsageMetrics(niFiMetricsRegistry, floeFileRepositoryUsage, contentRepositoryUsage, provenanceRepositoryUsage, + EMPTY, EMPTY, EMPTY, EMPTY, EMPTY); + final Set result = getRepoIdentifierSampleLabelNames(niFiMetricsRegistry); + + assertEquals(4, result.size()); + assertThat(result, hasItems(FLOW_FILE_REPO_IDENTIFIER, CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO, PROVENANCE_REPO_IDENTIFIER)); + } + private static ProcessGroupStatus createSingleProcessGroupStatus(final long queuedBytes, final long bytesThreshold, final int queuedCount, final long objectThreshold) { ProcessGroupStatus singleStatus = new ProcessGroupStatus(); List connectionStatuses = new ArrayList<>(); @@ -403,4 +426,39 @@ public class TestPrometheusMetricsUtil { PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, getPredictions(predictions, connection)); } } + + private StorageUsage createFloFileRepositoryUsage() { + return createStorageUsage(FLOW_FILE_REPO_IDENTIFIER); + } + + private Map createContentRepositoryUsage() { + return createStorageUsages(CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO); + } + + private Map createProvenanceRepositoryUsage() { + return createStorageUsages(PROVENANCE_REPO_IDENTIFIER); + } + + private StorageUsage createStorageUsage(final String repoIdentifier) { + final StorageUsage storageUsage = new StorageUsage(); + storageUsage.setFreeSpace(1L); + storageUsage.setTotalSpace(2L); + storageUsage.setIdentifier(repoIdentifier); + return storageUsage; + } + + private Map createStorageUsages(final String ... repoIdentifier) { + final Map storageUsageMap = new HashMap<>(); + for (final String repoName : repoIdentifier) { + storageUsageMap.put(repoName, createStorageUsage(repoName)); + } + return storageUsageMap; + } + + private Set getRepoIdentifierSampleLabelNames(final AbstractMetricsRegistry metricsRegistry) { + return Collections.list(metricsRegistry.getRegistry().filteredMetricFamilySamples(e -> e.contains(METRIC_NAME_SEGMENT_FOR_REPOSITORIES))) + .stream().flatMap(f -> f.samples.stream()) + .map(s -> s.labelValues.get(s.labelNames.indexOf(LABEL_NAME_FOR_REPO_IDENTIFIER))) + .collect(Collectors.toSet()); + } } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessEventAccess.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessEventAccess.java index 2aef7da86f..9f59c3cc6c 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessEventAccess.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessEventAccess.java @@ -22,12 +22,14 @@ import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; +import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.reporting.AbstractEventAccess; import java.util.Collections; import java.util.List; +import java.util.Map; public class StatelessEventAccess extends AbstractEventAccess { private final ProvenanceRepository provenanceRepository; @@ -47,4 +49,19 @@ public class StatelessEventAccess extends AbstractEventAccess { public List getFlowChanges(final int firstActionId, final int maxActions) { return Collections.emptyList(); } + + @Override + public Map getProvenanceRepositoryStorageUsage() { + return Collections.emptyMap(); + } + + @Override + public Map getContentRepositoryStorageUsage() { + return Collections.emptyMap(); + } + + @Override + public StorageUsage getFlowFileRepositoryStorageUsage() { + return null; + } }