NIFI-8272 Delete stale metrics from REST API Prometheus endpoint.

Added <scope>test</scope> tag to the nifi-web-api pom.xml and corrected imports.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5447
This commit is contained in:
noblenumbat360 2021-10-08 01:37:42 +11:00 committed by Matthew Burgess
parent 8491486181
commit f410c8df0a
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
6 changed files with 175 additions and 19 deletions

View File

@ -20,6 +20,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import java.util.ArrayList;
import java.util.List;
public class MockBulletinRepository implements BulletinRepository {
@ -45,7 +46,7 @@ public class MockBulletinRepository implements BulletinRepository {
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
// TODO: Implement
return null;
return new ArrayList<Bulletin>();
}
@Override

View File

@ -50,4 +50,13 @@ public class AbstractMetricsRegistry {
counter.labels(labels).inc(val);
}
public void clear() {
for (Gauge gauge : nameToGaugeMap.values()) {
gauge.clear();
}
for (Counter counter : nameToCounterMap.values()) {
counter.clear();
}
}
}

View File

@ -18,7 +18,6 @@
package org.apache.nifi.prometheus.util;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
@ -34,8 +33,6 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -101,20 +98,6 @@ public class PrometheusMetricsUtil {
final String componentId = StringUtils.isEmpty(status.getId()) ? DEFAULT_LABEL_STRING : status.getId();
final String componentName = StringUtils.isEmpty(status.getName()) ? DEFAULT_LABEL_STRING : status.getName();
// Clear all collectors to deal with removed/renamed components -- for root PG only
if("RootProcessGroup".equals(componentType)) {
try {
for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) {
SimpleCollector<?> sc = (SimpleCollector<?>) (field.get(null));
sc.clear();
}
}
} catch (IllegalAccessException e) {
// ignore
}
}
nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);

View File

@ -432,5 +432,10 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -5587,9 +5587,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public Collection<CollectorRegistry> generateFlowMetrics() {
final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId();
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
nifiMetricsRegistry.clear();
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.web
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.common.TextFormat
import org.apache.nifi.authorization.AccessDeniedException
import org.apache.nifi.authorization.AccessPolicy
import org.apache.nifi.authorization.AuthorizableLookup
@ -30,9 +32,19 @@ import org.apache.nifi.authorization.resource.ResourceFactory
import org.apache.nifi.authorization.user.NiFiUser
import org.apache.nifi.authorization.user.NiFiUserDetails
import org.apache.nifi.authorization.user.StandardNiFiUser
import org.apache.nifi.connectable.Connection
import org.apache.nifi.controller.flow.StandardFlowManager
import org.apache.nifi.controller.repository.FlowFileEvent
import org.apache.nifi.controller.repository.FlowFileEventRepository
import org.apache.nifi.controller.service.ControllerServiceProvider
import org.apache.nifi.controller.status.PortStatus
import org.apache.nifi.controller.status.ProcessGroupStatus
import org.apache.nifi.controller.status.RunStatus
import org.apache.nifi.diagnostics.StorageUsage
import org.apache.nifi.diagnostics.SystemDiagnostics
import org.apache.nifi.reporting.Bulletin
import org.apache.nifi.reporting.BulletinRepository
import org.apache.nifi.util.MockBulletinRepository
import org.apache.nifi.web.api.dto.AccessPolicyDTO
import org.apache.nifi.web.api.dto.BulletinDTO
import org.apache.nifi.web.api.dto.DtoFactory
@ -898,6 +910,151 @@ class StandardNiFiServiceFacadeSpec extends Specification {
}
def "Test REST API Prometheus Metrics Endpoint"() {
given:
def serviceFacade = new StandardNiFiServiceFacade()
BulletinRepository bulletinRepository = new MockBulletinRepository()
serviceFacade.setBulletinRepository(bulletinRepository)
ControllerFacade controllerFacade = Mock()
serviceFacade.setControllerFacade(controllerFacade)
controllerFacade.getInstanceId() >> "ABC"
controllerFacade.getMaxEventDrivenThreadCount() >> 1
controllerFacade.getMaxTimerDrivenThreadCount() >> 10
// Setting up storage repositories
StorageUsage flowFileStorage = new StorageUsage()
flowFileStorage.setIdentifier("flowFile")
flowFileStorage.setTotalSpace(222)
flowFileStorage.setFreeSpace(111)
StorageUsage contentStorage = new StorageUsage()
contentStorage.setIdentifier("default")
contentStorage.setTotalSpace(444)
contentStorage.setFreeSpace(111)
Map<String, StorageUsage> contentStorageMap = new HashMap<>()
contentStorageMap.put("default", contentStorage)
StorageUsage provenanceStorage = new StorageUsage()
provenanceStorage.setIdentifier("default")
provenanceStorage.setTotalSpace(666)
provenanceStorage.setFreeSpace(111)
Map<String, StorageUsage> provenanceStorageMap = new HashMap<>()
provenanceStorageMap.put("default", provenanceStorage)
// Setting up SystemDiagnostics
SystemDiagnostics systemDiagnostics = new SystemDiagnostics()
systemDiagnostics.setFlowFileRepositoryStorageUsage(flowFileStorage)
systemDiagnostics.setContentRepositoryStorageUsage(contentStorageMap)
systemDiagnostics.setProvenanceRepositoryStorageUsage(provenanceStorageMap)
controllerFacade.getSystemDiagnostics() >> systemDiagnostics
// Setting up flow
ProcessGroupStatus rootGroupStatus = new ProcessGroupStatus()
rootGroupStatus.setId("1234");
rootGroupStatus.setFlowFilesReceived(5);
rootGroupStatus.setBytesReceived(10000);
rootGroupStatus.setFlowFilesSent(10);
rootGroupStatus.setBytesSent(20000);
rootGroupStatus.setQueuedCount(100);
rootGroupStatus.setQueuedContentSize(1024L);
rootGroupStatus.setBytesRead(60000L);
rootGroupStatus.setBytesWritten(80000L);
rootGroupStatus.setActiveThreadCount(5);
rootGroupStatus.setName("root");
rootGroupStatus.setFlowFilesTransferred(5);
rootGroupStatus.setBytesTransferred(10000);
rootGroupStatus.setOutputContentSize(1000L);
rootGroupStatus.setInputContentSize(1000L);
rootGroupStatus.setOutputCount(100);
rootGroupStatus.setInputCount(1000);
PortStatus outputPortStatus = new PortStatus();
outputPortStatus.setId("9876");
outputPortStatus.setName("out");
outputPortStatus.setGroupId("1234");
outputPortStatus.setRunStatus(RunStatus.Stopped);
outputPortStatus.setActiveThreadCount(1);
rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
// Create a nested group status
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setFlowFilesReceived(5);
groupStatus2.setBytesReceived(10000);
groupStatus2.setFlowFilesSent(10);
groupStatus2.setBytesSent(20000);
groupStatus2.setQueuedCount(100);
groupStatus2.setQueuedContentSize(1024L);
groupStatus2.setActiveThreadCount(2);
groupStatus2.setBytesRead(12345L);
groupStatus2.setBytesWritten(11111L);
groupStatus2.setFlowFilesTransferred(5);
groupStatus2.setBytesTransferred(10000);
groupStatus2.setOutputContentSize(1000L);
groupStatus2.setInputContentSize(1000L);
groupStatus2.setOutputCount(100);
groupStatus2.setInputCount(1000);
groupStatus2.setId("3378");
groupStatus2.setName("nestedPG");
Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
// setting up flowFile events
controllerFacade.getProcessGroupStatus("root") >> rootGroupStatus
FlowFileEventRepository flowFileEventRepository = Mock()
controllerFacade.getFlowFileEventRepository() >> flowFileEventRepository
FlowFileEvent aggregateEvent = Mock()
flowFileEventRepository.reportAggregateEvent() >> aggregateEvent
// setting up connections (empty list for testing)
Set<Connection> connections = new HashSet()
StandardFlowManager flowManager = Mock()
controllerFacade.getFlowManager() >> flowManager
flowManager.findAllConnections() >> connections
when:
Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics()
// Converts metrics into a String for testing
Writer writer = new StringWriter();
for (CollectorRegistry collectorRegistry : allRegistries) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
}
String output = writer.toString();
writer.close()
// rename root group and generate metrics again to a different string
rootGroupStatus.setName("rootroot")
allRegistries = serviceFacade.generateFlowMetrics()
writer = new StringWriter()
for (CollectorRegistry collectorRegistry : allRegistries) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples())
}
String output2 = writer.toString()
writer.close()
then:
// flow metrics
output.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0");
// jvm
output.contains("nifi_jvm_heap_used{instance=\"ABC\",}")
output.contains("# HELP nifi_jvm_heap_used NiFi JVM heap used")
output.contains("# TYPE nifi_jvm_heap_used gauge")
output.contains("nifi_jvm_thread_count{instance=\"ABC\",}")
// test that renamed items are in the metrics output and that the previously named versions have been removed from the metrics output.
output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0");
output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0");
!output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
!output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0");
}
private UserGroupDTO createUserGroupDTO() {
new UserGroupDTO(id: 'group-1', name: 'test group', users: [createUserEntity()] as Set)
}