NIFI-12166 Add repo usage to the monitoring endpoint (#7836)

* NIFI-12166 Add repo usage to the monitoring endpoint
This commit is contained in:
timeabarna 2023-10-10 20:55:33 +02:00 committed by GitHub
parent 5a42d7245b
commit d2aec89738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 316 additions and 2 deletions

View File

@ -18,8 +18,11 @@ package org.apache.nifi.reporting;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -91,4 +94,22 @@ public interface EventAccess {
*/
long getTotalBytesReceived();
/**
* Returns the storage usage of all provenance repositories
* @return the map of all the storage usage
*/
Map<String, StorageUsage> getProvenanceRepositoryStorageUsage();
/**
* Returns the storage usage of all content repositories
* @return the map of all the storage usage
*/
Map<String, StorageUsage> getContentRepositoryStorageUsage();
/**
* Returns the storage usage of the flow file repository
* @return the storage usage
*/
StorageUsage getFlowFileRepositoryStorageUsage();
}

View File

@ -18,11 +18,14 @@ package org.apache.nifi.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.action.Action;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.reporting.EventAccess;
@ -119,4 +122,19 @@ public class MockEventAccess implements EventAccess {
public long getTotalBytesReceived() {
return -1;
}
@Override
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
return Collections.emptyMap();
}
@Override
public Map<String, StorageUsage> getContentRepositoryStorageUsage() {
return Collections.emptyMap();
}
@Override
public StorageUsage getFlowFileRepositoryStorageUsage() {
return null;
}
}

View File

@ -230,5 +230,50 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("FLOW_FILE_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_total_space_bytes")
.help("Flowfile repository total space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("FLOW_FILE_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_used_space_bytes")
.help("Flowfile repository used space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("FLOW_FILE_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_flow_file_repo_free_space_bytes")
.help("Flowfile repository free space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("CONTENT_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_content_repo_total_space_bytes")
.help("Content repository total space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("CONTENT_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_content_repo_used_space_bytes")
.help("Content repository used space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("CONTENT_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_content_repo_free_space_bytes")
.help("Content repository free space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("PROVENANCE_REPO_TOTAL_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_total_space_bytes")
.help("Provenance repository total space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("PROVENANCE_REPO_USED_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_used_space_bytes")
.help("Provenance repository used space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
nameToGaugeMap.put("PROVENANCE_REPO_FREE_SPACE_BYTES", Gauge.build().name("nifi_provenance_repo_free_space_bytes")
.help("Provenance repository free space in bytes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "repo_identifier")
.register(registry));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
@ -523,4 +524,35 @@ public class PrometheusMetricsUtil {
return clusterMetricsRegistry.getRegistry();
}
public static CollectorRegistry createStorageUsageMetrics(final NiFiMetricsRegistry nifiMetricsRegistry, final StorageUsage flowFileRepositoryUsage,
final Map<String, StorageUsage> contentRepositoryUsage, final Map<String, StorageUsage> provenanceRepositoryUsage,
final String instanceId, final String componentType, final String componentName, final String componentId, final String parentId) {
addStorageUsageMetric(nifiMetricsRegistry, flowFileRepositoryUsage, instanceId, componentType, componentName, componentId, parentId,
"FLOW_FILE_REPO_TOTAL_SPACE_BYTES", "FLOW_FILE_REPO_FREE_SPACE_BYTES", "FLOW_FILE_REPO_USED_SPACE_BYTES");
for (final StorageUsage usage : contentRepositoryUsage.values()) {
addStorageUsageMetric(nifiMetricsRegistry, usage, instanceId, componentType, componentName, componentId, parentId,
"CONTENT_REPO_TOTAL_SPACE_BYTES", "CONTENT_REPO_FREE_SPACE_BYTES", "CONTENT_REPO_USED_SPACE_BYTES");
}
for (final StorageUsage usage : provenanceRepositoryUsage.values()) {
addStorageUsageMetric(nifiMetricsRegistry, usage, instanceId, componentType, componentName, componentId, parentId,
"PROVENANCE_REPO_TOTAL_SPACE_BYTES", "PROVENANCE_REPO_FREE_SPACE_BYTES", "PROVENANCE_REPO_USED_SPACE_BYTES");
}
return nifiMetricsRegistry.getRegistry();
}
private static void addStorageUsageMetric(final NiFiMetricsRegistry nifiMetricsRegistry, final StorageUsage storageUsage, final String instanceId,
final String componentType, final String componentName, final String componentId, final String parentId,
final String totalSpaceLabel, final String freeSpaceLabel, final String usedSpaceLabel) {
nifiMetricsRegistry.setDataPoint(storageUsage.getTotalSpace(), totalSpaceLabel,
instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier());
nifiMetricsRegistry.setDataPoint(storageUsage.getFreeSpace(), freeSpaceLabel,
instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier());
nifiMetricsRegistry.setDataPoint(storageUsage.getUsedSpace(), usedSpaceLabel,
instanceId, componentType, componentName, componentId, parentId, storageUsage.getIdentifier());
}
}

View File

@ -756,7 +756,8 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
}
eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, auditService, analyticsEngine);
eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository,
auditService, analyticsEngine, flowFileRepository, contentRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override

View File

@ -25,19 +25,26 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.provenance.ProvenanceRepository;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
public class StandardEventAccess extends AbstractEventAccess implements UserAwareEventAccess {
@ -46,15 +53,20 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar
private final Authorizer authorizer;
private final ProvenanceRepository provenanceRepository;
private final AuditService auditService;
private final FlowFileRepository flowFileRepository;
private final ContentRepository contentRepository;
public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRepository flowFileEventRepository, final ProcessScheduler processScheduler,
final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine) {
final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine,
final FlowFileRepository flowFileRepository, final ContentRepository contentRepository) {
super(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository);
this.flowFileEventRepository = flowFileEventRepository;
this.flowManager = flowManager;
this.authorizer = authorizer;
this.provenanceRepository = provenanceRepository;
this.auditService = auditService;
this.flowFileRepository = flowFileRepository;
this.contentRepository = contentRepository;
}
@ -146,4 +158,80 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar
// on demand status request for a specific user... require authorization per component and filter results as appropriate
return getGroupStatus(group, statusReport, authorizable -> authorizable.isAuthorized(this.authorizer, RequestAction.READ, user), recursiveStatusDepth, 1, false);
}
/**
* Returns the storage usage of all provenance repositories
* @return the map of all the storage usage
*/
@Override
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
final Set<String> provContainerNames = provenanceRepository.getContainerNames();
final Map<String, StorageUsage> provRepositoryUsage = new LinkedHashMap<>(provContainerNames.size());
for (final String containerName : provContainerNames) {
long containerFree;
long containerCapacity;
try {
containerFree = provenanceRepository.getContainerUsableSpace(containerName);
containerCapacity = provenanceRepository.getContainerCapacity(containerName);
} catch (final IOException ioe) {
containerFree = 0L;
containerCapacity = -1L;
}
final StorageUsage storageUsage = new StorageUsage();
storageUsage.setIdentifier(containerName);
storageUsage.setFreeSpace(containerFree);
storageUsage.setTotalSpace(containerCapacity);
provRepositoryUsage.put(containerName, storageUsage);
}
return provRepositoryUsage;
}
/**
* Returns the storage usage of all content repositories
* @return the map of all the storage usage
*/
@Override
public Map<String, StorageUsage> getContentRepositoryStorageUsage() {
final Set<String> containerNames = contentRepository.getContainerNames();
final Map<String, StorageUsage> fileRepositoryUsage = new LinkedHashMap<>(containerNames.size());
for (final String containerName : containerNames) {
long containerFree;
long containerCapacity;
try {
containerFree = contentRepository.getContainerUsableSpace(containerName);
containerCapacity = contentRepository.getContainerCapacity(containerName);
} catch (final IOException ioe) {
containerFree = 0L;
containerCapacity = -1L;
}
final StorageUsage storageUsage = new StorageUsage();
storageUsage.setIdentifier(containerName);
storageUsage.setFreeSpace(containerFree);
storageUsage.setTotalSpace(containerCapacity);
fileRepositoryUsage.put(containerName, storageUsage);
}
return fileRepositoryUsage;
}
/**
* Returns the storage usage of the flow file repository
* @return the storage usage
*/
@Override
public StorageUsage getFlowFileRepositoryStorageUsage() {
final StorageUsage flowFileRepoStorageUsage = new StorageUsage();
flowFileRepoStorageUsage.setIdentifier("FlowFile Repository");
try {
flowFileRepoStorageUsage.setFreeSpace(flowFileRepository.getUsableStorageSpace());
flowFileRepoStorageUsage.setTotalSpace(flowFileRepository.getStorageCapacity());
} catch (final IOException ioe) {
flowFileRepoStorageUsage.setFreeSpace(0L);
flowFileRepoStorageUsage.setTotalSpace(-1L);
}
return flowFileRepoStorageUsage;
}
}

View File

@ -101,6 +101,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.diagnostics.DiagnosticLevel;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -6128,6 +6129,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
//Add flow file repository, content repository and provenance repository usage to NiFi metrics
final StorageUsage flowFileRepositoryUsage = controllerFacade.getFlowFileRepositoryStorageUsage();
final Map<String, StorageUsage> contentRepositoryUsage = controllerFacade.getContentRepositoryStorageUsage();
final Map<String, StorageUsage> provenanceRepositoryUsage = controllerFacade.getProvenanceRepositoryStorageUsage();
PrometheusMetricsUtil.createStorageUsageMetrics(nifiMetricsRegistry, flowFileRepositoryUsage, contentRepositoryUsage, provenanceRepositoryUsage,
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
//Add total task duration for root to the NiFi metrics registry
// The latest aggregated status history is the last element in the list so we need the last element only
final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);

View File

@ -62,6 +62,7 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flow.VersionedProcessGroup;
@ -1735,6 +1736,30 @@ public class ControllerFacade implements Authorizable {
return flowController.getFlowFileEventRepository();
}
/**
* Returns the storage usage of all provenance repositories
* @return the map of all the storage usage
*/
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
return flowController.getEventAccess().getProvenanceRepositoryStorageUsage();
}
/**
* Returns the storage usage of all content repositories
* @return the map of all the storage usage
*/
public Map<String, StorageUsage> getContentRepositoryStorageUsage() {
return flowController.getEventAccess().getContentRepositoryStorageUsage();
}
/**
* Returns the storage usage of the flow file repository
* @return the storage usage
*/
public StorageUsage getFlowFileRepositoryStorageUsage() {
return flowController.getEventAccess().getFlowFileRepositoryStorageUsage();
}
/*
* setters
*/

View File

@ -18,6 +18,7 @@ package org.apache.nifi.reporting.prometheus;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
@ -65,6 +66,13 @@ public class TestPrometheusMetricsUtil {
private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
private static final String METRIC_NAME_SEGMENT_FOR_REPOSITORIES = "repo";
private static final String LABEL_NAME_FOR_REPO_IDENTIFIER = "repo_identifier";
private static final String FLOW_FILE_REPO_IDENTIFIER = "flowFileRepo";
private static final String CONTENT_REPO_IDENTIFIER_ONE = "contentRepo1";
private static final String CONTENT_REPO_IDENTIFIER_TWO = "contentRepo2";
private static final String PROVENANCE_REPO_IDENTIFIER = "provenanceRepo";
private static ProcessGroupStatus singleProcessGroupStatus;
private static ProcessGroupStatus nestedProcessGroupStatus;
private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
@ -304,6 +312,21 @@ public class TestPrometheusMetricsUtil {
assertThat(sampleValues, hasItems(EXPECTED_NESTED_BYTES_PERCENT_VALUE, EXPECTED_NESTED_COUNT_PERCENT_VALUE));
}
@Test
public void testStorageUsageAddedToNifiMetrics() {
final NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
final StorageUsage floeFileRepositoryUsage = createFloFileRepositoryUsage();
final Map<String, StorageUsage> contentRepositoryUsage = createContentRepositoryUsage();
final Map<String, StorageUsage> provenanceRepositoryUsage = createProvenanceRepositoryUsage();
PrometheusMetricsUtil.createStorageUsageMetrics(niFiMetricsRegistry, floeFileRepositoryUsage, contentRepositoryUsage, provenanceRepositoryUsage,
EMPTY, EMPTY, EMPTY, EMPTY, EMPTY);
final Set<String> result = getRepoIdentifierSampleLabelNames(niFiMetricsRegistry);
assertEquals(4, result.size());
assertThat(result, hasItems(FLOW_FILE_REPO_IDENTIFIER, CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO, PROVENANCE_REPO_IDENTIFIER));
}
private static ProcessGroupStatus createSingleProcessGroupStatus(final long queuedBytes, final long bytesThreshold, final int queuedCount, final long objectThreshold) {
ProcessGroupStatus singleStatus = new ProcessGroupStatus();
List<ConnectionStatus> connectionStatuses = new ArrayList<>();
@ -403,4 +426,39 @@ public class TestPrometheusMetricsUtil {
PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, getPredictions(predictions, connection));
}
}
private StorageUsage createFloFileRepositoryUsage() {
return createStorageUsage(FLOW_FILE_REPO_IDENTIFIER);
}
private Map<String, StorageUsage> createContentRepositoryUsage() {
return createStorageUsages(CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO);
}
private Map<String, StorageUsage> createProvenanceRepositoryUsage() {
return createStorageUsages(PROVENANCE_REPO_IDENTIFIER);
}
private StorageUsage createStorageUsage(final String repoIdentifier) {
final StorageUsage storageUsage = new StorageUsage();
storageUsage.setFreeSpace(1L);
storageUsage.setTotalSpace(2L);
storageUsage.setIdentifier(repoIdentifier);
return storageUsage;
}
private Map<String, StorageUsage> createStorageUsages(final String ... repoIdentifier) {
final Map<String, StorageUsage> storageUsageMap = new HashMap<>();
for (final String repoName : repoIdentifier) {
storageUsageMap.put(repoName, createStorageUsage(repoName));
}
return storageUsageMap;
}
private Set<String> getRepoIdentifierSampleLabelNames(final AbstractMetricsRegistry metricsRegistry) {
return Collections.list(metricsRegistry.getRegistry().filteredMetricFamilySamples(e -> e.contains(METRIC_NAME_SEGMENT_FOR_REPOSITORIES)))
.stream().flatMap(f -> f.samples.stream())
.map(s -> s.labelValues.get(s.labelNames.indexOf(LABEL_NAME_FOR_REPO_IDENTIFIER)))
.collect(Collectors.toSet());
}
}

View File

@ -22,12 +22,14 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.AbstractEventAccess;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class StatelessEventAccess extends AbstractEventAccess {
private final ProvenanceRepository provenanceRepository;
@ -47,4 +49,19 @@ public class StatelessEventAccess extends AbstractEventAccess {
public List<Action> getFlowChanges(final int firstActionId, final int maxActions) {
return Collections.emptyList();
}
@Override
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
return Collections.emptyMap();
}
@Override
public Map<String, StorageUsage> getContentRepositoryStorageUsage() {
return Collections.emptyMap();
}
@Override
public StorageUsage getFlowFileRepositoryStorageUsage() {
return null;
}
}