NIFI-7017: This closes #3988. Fixed PrometheusReportingTask for nested PG status

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2020-01-14 13:17:34 -05:00 committed by Joe Witt
parent 103325354b
commit bb699e7497
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 37 additions and 9 deletions

View File

@ -329,17 +329,19 @@ public class PrometheusMetricsUtil {
final String componentId = status.getId(); final String componentId = status.getId();
final String componentName = status.getName(); final String componentName = status.getName();
// Clear all collectors to deal with removed/renamed components // Clear all collectors to deal with removed/renamed components -- for root PG only
if("RootProcessGroup".equals(componentType)) {
try { try {
for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) { for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) { if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) {
SimpleCollector sc = (SimpleCollector)(field.get(null)); SimpleCollector<?> sc = (SimpleCollector<?>) (field.get(null));
sc.clear(); sc.clear();
} }
} }
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
// ignore // ignore
} }
}
AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent()); AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred()); AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
@ -372,7 +374,7 @@ public class PrometheusMetricsUtil {
// Report metrics for child process groups if specified // Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup", metricsStrategy)); status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
} }
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {

View File

@ -19,6 +19,8 @@ package org.apache.nifi.reporting.prometheus;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
@ -93,6 +95,28 @@ public class TestPrometheusReportingTask {
outputPortStatus.setActiveThreadCount(1); outputPortStatus.setActiveThreadCount(1);
rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus)); rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
// Create a nested group status
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setFlowFilesReceived(5);
groupStatus2.setBytesReceived(10000);
groupStatus2.setFlowFilesSent(10);
groupStatus2.setBytesSent(20000);
groupStatus2.setQueuedCount(100);
groupStatus2.setQueuedContentSize(1024L);
groupStatus2.setActiveThreadCount(2);
groupStatus2.setBytesRead(12345L);
groupStatus2.setBytesWritten(11111L);
groupStatus2.setFlowFilesTransferred(5);
groupStatus2.setBytesTransferred(10000);
groupStatus2.setOutputContentSize(1000L);
groupStatus2.setInputContentSize(1000L);
groupStatus2.setOutputCount(100);
groupStatus2.setInputCount(1000);
groupStatus2.setId("3378");
groupStatus2.setName("nestedPG");
Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
} }
@Test @Test
@ -107,6 +131,8 @@ public class TestPrometheusReportingTask {
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0")); "nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
Assert.assertTrue(content.contains( Assert.assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0")); "nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
Assert.assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0"));
// Rename the component // Rename the component
rootGroupStatus.setName("rootroot"); rootGroupStatus.setName("rootroot");