diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java index bafdfdb79b..c273ae805c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java @@ -20,6 +20,7 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import java.util.ArrayList; import java.util.List; public class MockBulletinRepository implements BulletinRepository { @@ -45,7 +46,7 @@ public class MockBulletinRepository implements BulletinRepository { @Override public List findBulletins(BulletinQuery bulletinQuery) { // TODO: Implement - return null; + return new ArrayList(); } @Override diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java index 12c65e7a87..9163ed9607 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java @@ -50,4 +50,13 @@ public class AbstractMetricsRegistry { counter.labels(labels).inc(val); } + + public void clear() { + for (Gauge gauge : nameToGaugeMap.values()) { + gauge.clear(); + } + for (Counter counter : nameToCounterMap.values()) { + counter.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index b0d6fef356..f52a5cb56e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -18,7 +18,6 @@ package org.apache.nifi.prometheus.util; import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.SimpleCollector; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.status.ConnectionStatus; @@ -34,8 +33,6 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -101,20 +98,6 @@ public class PrometheusMetricsUtil { final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId(); final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName(); - // Clear all collectors to deal with removed/renamed components -- for root PG only - if("RootProcessGroup".equals(componentType)) { - try { - for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) { - if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) { - SimpleCollector sc = (SimpleCollector) (field.get(null)); - sc.clear(); - } - } - } catch (IllegalAccessException e) { - // ignore - } - } - nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId); nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml index 7a13dd29ea..4c855b28f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml @@ -432,5 +432,10 @@ org.slf4j jcl-over-slf4j + + org.apache.nifi + nifi-mock + test + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index b22a417a1c..ee986dd6a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -5587,9 +5587,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public Collection generateFlowMetrics() { - final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId(); ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root"); + + nifiMetricsRegistry.clear(); PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup", PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy index a830a87755..3018830f42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy @@ -16,6 +16,8 @@ */ package org.apache.nifi.web +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.exporter.common.TextFormat import org.apache.nifi.authorization.AccessDeniedException import org.apache.nifi.authorization.AccessPolicy import org.apache.nifi.authorization.AuthorizableLookup @@ -30,9 +32,19 @@ import org.apache.nifi.authorization.resource.ResourceFactory import org.apache.nifi.authorization.user.NiFiUser import org.apache.nifi.authorization.user.NiFiUserDetails import org.apache.nifi.authorization.user.StandardNiFiUser +import org.apache.nifi.connectable.Connection +import org.apache.nifi.controller.flow.StandardFlowManager +import org.apache.nifi.controller.repository.FlowFileEvent +import org.apache.nifi.controller.repository.FlowFileEventRepository import org.apache.nifi.controller.service.ControllerServiceProvider +import org.apache.nifi.controller.status.PortStatus +import org.apache.nifi.controller.status.ProcessGroupStatus +import org.apache.nifi.controller.status.RunStatus +import org.apache.nifi.diagnostics.StorageUsage +import org.apache.nifi.diagnostics.SystemDiagnostics import org.apache.nifi.reporting.Bulletin import org.apache.nifi.reporting.BulletinRepository +import org.apache.nifi.util.MockBulletinRepository import org.apache.nifi.web.api.dto.AccessPolicyDTO import org.apache.nifi.web.api.dto.BulletinDTO import org.apache.nifi.web.api.dto.DtoFactory @@ -898,6 +910,151 @@ class StandardNiFiServiceFacadeSpec extends Specification { } + def "Test REST API Prometheus Metrics Endpoint"() { + given: + def serviceFacade = new StandardNiFiServiceFacade() + BulletinRepository bulletinRepository = new MockBulletinRepository() + serviceFacade.setBulletinRepository(bulletinRepository) + + ControllerFacade controllerFacade = Mock() + serviceFacade.setControllerFacade(controllerFacade) + controllerFacade.getInstanceId() >> "ABC" + controllerFacade.getMaxEventDrivenThreadCount() >> 1 + controllerFacade.getMaxTimerDrivenThreadCount() >> 10 + + // Setting up storage repositories + StorageUsage flowFileStorage = new StorageUsage() + flowFileStorage.setIdentifier("flowFile") + flowFileStorage.setTotalSpace(222) + flowFileStorage.setFreeSpace(111) + + StorageUsage contentStorage = new StorageUsage() + contentStorage.setIdentifier("default") + contentStorage.setTotalSpace(444) + contentStorage.setFreeSpace(111) + Map contentStorageMap = new HashMap<>() + contentStorageMap.put("default", contentStorage) + + StorageUsage provenanceStorage = new StorageUsage() + provenanceStorage.setIdentifier("default") + provenanceStorage.setTotalSpace(666) + provenanceStorage.setFreeSpace(111) + Map provenanceStorageMap = new HashMap<>() + provenanceStorageMap.put("default", provenanceStorage) + + // Setting up SystemDiagnostics + SystemDiagnostics systemDiagnostics = new SystemDiagnostics() + systemDiagnostics.setFlowFileRepositoryStorageUsage(flowFileStorage) + systemDiagnostics.setContentRepositoryStorageUsage(contentStorageMap) + systemDiagnostics.setProvenanceRepositoryStorageUsage(provenanceStorageMap) + + controllerFacade.getSystemDiagnostics() >> systemDiagnostics + + // Setting up flow + ProcessGroupStatus rootGroupStatus = new ProcessGroupStatus() + rootGroupStatus.setId("1234"); + rootGroupStatus.setFlowFilesReceived(5); + rootGroupStatus.setBytesReceived(10000); + rootGroupStatus.setFlowFilesSent(10); + rootGroupStatus.setBytesSent(20000); + rootGroupStatus.setQueuedCount(100); + rootGroupStatus.setQueuedContentSize(1024L); + rootGroupStatus.setBytesRead(60000L); + rootGroupStatus.setBytesWritten(80000L); + rootGroupStatus.setActiveThreadCount(5); + rootGroupStatus.setName("root"); + rootGroupStatus.setFlowFilesTransferred(5); + rootGroupStatus.setBytesTransferred(10000); + rootGroupStatus.setOutputContentSize(1000L); + rootGroupStatus.setInputContentSize(1000L); + rootGroupStatus.setOutputCount(100); + rootGroupStatus.setInputCount(1000); + + PortStatus outputPortStatus = new PortStatus(); + outputPortStatus.setId("9876"); + outputPortStatus.setName("out"); + outputPortStatus.setGroupId("1234"); + outputPortStatus.setRunStatus(RunStatus.Stopped); + outputPortStatus.setActiveThreadCount(1); + + rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus)); + // Create a nested group status + ProcessGroupStatus groupStatus2 = new ProcessGroupStatus(); + groupStatus2.setFlowFilesReceived(5); + groupStatus2.setBytesReceived(10000); + groupStatus2.setFlowFilesSent(10); + groupStatus2.setBytesSent(20000); + groupStatus2.setQueuedCount(100); + groupStatus2.setQueuedContentSize(1024L); + groupStatus2.setActiveThreadCount(2); + groupStatus2.setBytesRead(12345L); + groupStatus2.setBytesWritten(11111L); + groupStatus2.setFlowFilesTransferred(5); + groupStatus2.setBytesTransferred(10000); + groupStatus2.setOutputContentSize(1000L); + groupStatus2.setInputContentSize(1000L); + groupStatus2.setOutputCount(100); + groupStatus2.setInputCount(1000); + groupStatus2.setId("3378"); + groupStatus2.setName("nestedPG"); + Collection nestedGroupStatuses = new ArrayList<>(); + nestedGroupStatuses.add(groupStatus2); + rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses); + + // setting up flowFile events + controllerFacade.getProcessGroupStatus("root") >> rootGroupStatus + FlowFileEventRepository flowFileEventRepository = Mock() + controllerFacade.getFlowFileEventRepository() >> flowFileEventRepository + FlowFileEvent aggregateEvent = Mock() + flowFileEventRepository.reportAggregateEvent() >> aggregateEvent + + // setting up connections (empty list for testing) + Set connections = new HashSet() + StandardFlowManager flowManager = Mock() + controllerFacade.getFlowManager() >> flowManager + flowManager.findAllConnections() >> connections + + when: + Collection allRegistries = serviceFacade.generateFlowMetrics() + + // Converts metrics into a String for testing + Writer writer = new StringWriter(); + for (CollectorRegistry collectorRegistry : allRegistries) { + TextFormat.write004(writer, collectorRegistry.metricFamilySamples()); + } + String output = writer.toString(); + writer.close() + + // rename root group and generate metrics again to a different string + rootGroupStatus.setName("rootroot") + allRegistries = serviceFacade.generateFlowMetrics() + writer = new StringWriter() + for (CollectorRegistry collectorRegistry : allRegistries) { + TextFormat.write004(writer, collectorRegistry.metricFamilySamples()) + } + String output2 = writer.toString() + writer.close() + + then: + // flow metrics + output.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"); + output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"); + output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0"); + + // jvm + output.contains("nifi_jvm_heap_used{instance=\"ABC\",}") + output.contains("# HELP nifi_jvm_heap_used NiFi JVM heap used") + output.contains("# TYPE nifi_jvm_heap_used gauge") + output.contains("nifi_jvm_thread_count{instance=\"ABC\",}") + + // test that renamed items are in the metrics output and that the previously named versions have been removed from the metrics output. + output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"); + output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"); + !output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"); + !output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"); + + } + private UserGroupDTO createUserGroupDTO() { new UserGroupDTO(id: 'group-1', name: 'test group', users: [createUserEntity()] as Set) }