diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java index 2d3b5feba4..97eca61a8c 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java @@ -153,6 +153,12 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry { "source_id", "source_name", "destination_id", "destination_name") .register(registry)); + nameToGaugeMap.put("TOTAL_TASK_DURATION", Gauge.build() + .name("nifi_total_task_duration") + .help("The total number of thread-milliseconds that the component has used to complete its tasks in the past 5 minutes") + .labelNames("instance", "component_type", "component_name", "component_id", "parent_id") + .register(registry)); + // Processor metrics nameToGaugeMap.put("PROCESSOR_COUNTERS", Gauge.build() .name("nifi_processor_counters") diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index f52a5cb56e..6f1118ec65 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -49,6 +49,8 @@ public class PrometheusMetricsUtil { private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry(); protected static final String DEFAULT_LABEL_STRING = ""; + private static final double MAXIMUM_BACKPRESSURE = 1.0; + private static final double UNDEFINED_BACKPRESSURE = -1.0; // Common properties/values public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", @@ -174,6 +176,7 @@ public class PrometheusMetricsUtil { instanceId, procComponentType, procComponentName, procComponentId, parentId); } + for (ConnectionStatus connectionStatus : status.getConnectionStatus()) { final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId(); final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName(); @@ -212,6 +215,7 @@ public class PrometheusMetricsUtil { nifiMetricsRegistry.setDataPoint(isBackpressureEnabled ? 1 : 0, "IS_BACKPRESSURE_ENABLED", instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName); } + for (PortStatus portStatus : status.getInputPortStatus()) { final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId(); @@ -312,9 +316,10 @@ public class PrometheusMetricsUtil { return jvmMetricsRegistry.getRegistry(); } - public static CollectorRegistry createConnectionStatusAnalyticsMetrics(ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, StatusAnalytics statusAnalytics, - String instId, String connComponentType, String connName, - String connId, String pgId, String srcId, String srcName, String destId, String destName) { + public static CollectorRegistry createConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, final StatusAnalytics statusAnalytics, + final String instId, final String connComponentType, final String connName, final String connId, + final String pgId, final String srcId, final String srcName, final String destId, final String destName) { + if(statusAnalytics != null) { final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; final String connComponentId = StringUtils.isEmpty(connId) ? DEFAULT_LABEL_STRING : connId; @@ -362,4 +367,149 @@ public class PrometheusMetricsUtil { bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level); return bulletinMetricsRegistry.getRegistry(); } + + public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map aggregatedMetrics) { + status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics)); + + for (ConnectionStatus connectionStatus : status.getConnectionStatus()) { + final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold()); + final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold()); + + determineMaxValueForPercentUsed(aggregatedMetrics, + "nifi_percent_used_bytes", + percentUsedBytes); + + determineMaxValueForPercentUsed(aggregatedMetrics, + "nifi_percent_used_count", + percentUsedCount); + + setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure"); + setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure"); + } + } + + public static void aggregateConnectionPredictionMetrics(final Map aggregatedMetrics, final Map predictions) { + determineMinValueForPredictions(aggregatedMetrics, + "nifi_time_to_bytes_backpressure_prediction", + predictions.get("timeToBytesBackpressureMillis"), + "bytesAtBackpressure"); + + determineMinValueForPredictions(aggregatedMetrics, + "nifi_time_to_count_backpressure_prediction", + predictions.get("timeToCountBackpressureMillis"), + "countAtBackpressure"); + } + + private static void setBackpressure(final Map aggregatedMetrics, final double percentUsed, final String atBackpressureKey) { + if (percentUsed >= 100) { + aggregatedMetrics.put(atBackpressureKey, MAXIMUM_BACKPRESSURE); + } else if (!aggregatedMetrics.containsKey(atBackpressureKey)) { + aggregatedMetrics.put(atBackpressureKey, 0.0); + } + } + + private static void determineMinValueForPredictions(final Map aggregatedMetrics, final String metricFamilySamplesName, + final double metricSampleValue, final String atBackpressureKey) { + final Double currentValue = aggregatedMetrics.get(metricFamilySamplesName); + if (aggregatedMetrics.get(atBackpressureKey) != null && aggregatedMetrics.get(atBackpressureKey) == MAXIMUM_BACKPRESSURE) { + aggregatedMetrics.put(metricFamilySamplesName, 0.0); + } else if (currentValue == null) { + aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue); + } else if (metricSampleValue > -1) { + if (currentValue == -1) { + aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue); + } else { + aggregatedMetrics.put(metricFamilySamplesName, Math.min(metricSampleValue, currentValue)); + } + } + } + + private static void determineMaxValueForPercentUsed(final Map aggregatedMetrics, final String metricFamilySamplesName, final double metricSampleValue) { + + Double currentValue = aggregatedMetrics.get(metricFamilySamplesName); + if (currentValue == null) { + currentValue = 0.0; + } + aggregatedMetrics.put(metricFamilySamplesName, Math.max(metricSampleValue, currentValue)); + } + + public static CollectorRegistry createAggregatedConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, + final Map aggregatedMetrics, + final String instId, final String compType, final String compName, final String compId) { + + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType; + final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName; + final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId; + final Double bytesValue = aggregatedMetrics.get("nifi_time_to_bytes_backpressure_prediction"); + final Double countsValue = aggregatedMetrics.get("nifi_time_to_count_backpressure_prediction"); + final double bytesBackpressure = bytesValue == null ? UNDEFINED_BACKPRESSURE : bytesValue; + final double countsBackpressure = countsValue == null ? UNDEFINED_BACKPRESSURE : countsValue; + + connectionAnalyticsMetricsRegistry.setDataPoint(bytesBackpressure, + "TIME_TO_BYTES_BACKPRESSURE_PREDICTION", + instanceId, + componentType, + componentName, + componentId, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING); + + connectionAnalyticsMetricsRegistry.setDataPoint(countsBackpressure, + "TIME_TO_COUNT_BACKPRESSURE_PREDICTION", + instanceId, + componentType, + componentName, + componentId, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING); + + return connectionAnalyticsMetricsRegistry.getRegistry(); + } + + public static CollectorRegistry createAggregatedNifiMetrics(final NiFiMetricsRegistry niFiMetricsRegistry, + final Map aggregatedMetrics, + final String instId, final String compType, final String compName, final String compId) { + + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType; + final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName; + final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId; + final Double bytesValue = aggregatedMetrics.get("nifi_percent_used_bytes"); + final Double countsValue = aggregatedMetrics.get("nifi_percent_used_count"); + final double percentBytes = bytesValue == null ? 0.0 : bytesValue; + final double percentCount = countsValue == null ? 0.0 : countsValue; + + niFiMetricsRegistry.setDataPoint(percentBytes, + "PERCENT_USED_BYTES", + instanceId, + componentType, + componentName, + componentId, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING); + + niFiMetricsRegistry.setDataPoint(percentCount, + "PERCENT_USED_COUNT", + instanceId, + componentType, + componentName, + componentId, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING, + DEFAULT_LABEL_STRING); + + return niFiMetricsRegistry.getRegistry(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 7d0127200e..3f9c864d48 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -94,6 +94,8 @@ import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -253,6 +255,7 @@ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; import org.apache.nifi.web.api.entity.ActionEntity; @@ -374,6 +377,7 @@ import java.util.stream.Stream; public class StandardNiFiServiceFacade implements NiFiServiceFacade { private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); private static final int VALIDATION_WAIT_MILLIS = 50; + private static final String ROOT_PROCESS_GROUP = "RootProcessGroup"; // nifi core components private ControllerFacade controllerFacade; @@ -5612,7 +5616,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root"); nifiMetricsRegistry.clear(); - PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup", + PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", ROOT_PROCESS_GROUP, PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue()); // Add the total byte counts (read/written) to the NiFi metrics registry @@ -5621,22 +5625,42 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final String rootPGName = StringUtils.isEmpty(rootPGStatus.getName()) ? "" : rootPGStatus.getName(); final FlowFileEvent aggregateEvent = flowFileEventRepository.reportAggregateEvent(); nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesRead(), "TOTAL_BYTES_READ", - instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesWritten(), "TOTAL_BYTES_WRITTEN", - instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesSent(), "TOTAL_BYTES_SENT", - instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED", - instanceId, "RootProcessGroup", rootPGName, rootPGId, ""); + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); + + //Add total task duration for root to the NiFi metrics registry + // The latest aggregated status history is the last element in the list so we need the last element only + final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId); + final List aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots(); + final int lastIndex = aggregatedStatusHistory.size() -1; + final String taskDurationInMillis = ProcessGroupStatusDescriptor.TASK_MILLIS.getField(); + long taskDuration = 0; + if (!aggregatedStatusHistory.isEmpty()) { + final StatusSnapshotDTO latestStatusHistory = aggregatedStatusHistory.get(lastIndex); + taskDuration = latestStatusHistory.getStatusMetrics().get(taskDurationInMillis); + } + nifiMetricsRegistry.setDataPoint(taskDuration, "TOTAL_TASK_DURATION", + instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, ""); PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId); + final Map aggregatedMetrics = new HashMap<>(); + PrometheusMetricsUtil.aggregatePercentUsed(rootPGStatus, aggregatedMetrics); + PrometheusMetricsUtil.createAggregatedNifiMetrics(nifiMetricsRegistry, aggregatedMetrics, instanceId,ROOT_PROCESS_GROUP, rootPGName, rootPGId); + // Get Connection Status Analytics (predictions, e.g.) Set connections = controllerFacade.getFlowManager().findAllConnections(); for (Connection c : connections) { // If a ResourceNotFoundException is thrown, analytics hasn't been enabled try { - PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()), + final StatusAnalytics statusAnalytics = controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()); + PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, + statusAnalytics, instanceId, "Connection", c.getName(), @@ -5647,10 +5671,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { c.getDestination().getName(), c.getDestination().getIdentifier() ); + PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, statusAnalytics.getPredictions()); } catch (ResourceNotFoundException rnfe) { break; } } + PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, aggregatedMetrics, instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId); // Create a query to get all bulletins final BulletinQueryDTO query = new BulletinQueryDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy index 3cd9fff48f..653321ff1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy @@ -32,7 +32,12 @@ import org.apache.nifi.authorization.resource.ResourceFactory import org.apache.nifi.authorization.user.NiFiUser import org.apache.nifi.authorization.user.NiFiUserDetails import org.apache.nifi.authorization.user.StandardNiFiUser +import org.apache.nifi.components.state.StateManagerProvider import org.apache.nifi.connectable.Connection +import org.apache.nifi.controller.NodeTypeProvider +import org.apache.nifi.controller.ProcessScheduler +import org.apache.nifi.controller.ReloadComponent +import org.apache.nifi.controller.flow.FlowManager import org.apache.nifi.controller.flow.StandardFlowManager import org.apache.nifi.controller.repository.FlowFileEvent import org.apache.nifi.controller.repository.FlowFileEventRepository @@ -42,20 +47,31 @@ import org.apache.nifi.controller.status.ProcessGroupStatus import org.apache.nifi.controller.status.RunStatus import org.apache.nifi.diagnostics.StorageUsage import org.apache.nifi.diagnostics.SystemDiagnostics +import org.apache.nifi.encrypt.PropertyEncryptor +import org.apache.nifi.groups.ProcessGroup +import org.apache.nifi.groups.StandardProcessGroup +import org.apache.nifi.nar.ExtensionManager +import org.apache.nifi.registry.flow.FlowRegistryClient +import org.apache.nifi.registry.variable.MutableVariableRegistry import org.apache.nifi.reporting.Bulletin import org.apache.nifi.reporting.BulletinRepository import org.apache.nifi.util.MockBulletinRepository +import org.apache.nifi.util.NiFiProperties import org.apache.nifi.web.api.dto.AccessPolicyDTO import org.apache.nifi.web.api.dto.BulletinDTO import org.apache.nifi.web.api.dto.DtoFactory import org.apache.nifi.web.api.dto.EntityFactory +import org.apache.nifi.web.api.dto.PermissionsDTO import org.apache.nifi.web.api.dto.RevisionDTO import org.apache.nifi.web.api.dto.UserDTO import org.apache.nifi.web.api.dto.UserGroupDTO +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO import org.apache.nifi.web.api.entity.BulletinEntity +import org.apache.nifi.web.api.entity.StatusHistoryEntity import org.apache.nifi.web.api.entity.UserEntity import org.apache.nifi.web.controller.ControllerFacade import org.apache.nifi.web.dao.AccessPolicyDAO +import org.apache.nifi.web.dao.ProcessGroupDAO import org.apache.nifi.web.dao.UserDAO import org.apache.nifi.web.dao.UserGroupDAO import org.apache.nifi.web.revision.DeleteRevisionTask @@ -1008,6 +1024,24 @@ class StandardNiFiServiceFacadeSpec extends Specification { FlowFileEvent aggregateEvent = Mock() flowFileEventRepository.reportAggregateEvent() >> aggregateEvent + ProcessGroupDAO processGroupDAO = Mock() + serviceFacade.setProcessGroupDAO(processGroupDAO) + ProcessGroup processGroup = Mock() + processGroupDAO.getProcessGroup(rootGroupStatus.getId()) >> processGroup + DtoFactory dtoFactory = new DtoFactory() + serviceFacade.setDtoFactory(dtoFactory) + PermissionsDTO permissions = Mock() + dtoFactory.createPermissionsDto(processGroup) >> permissions + StatusHistoryEntity statusHistoryEntity = new StatusHistoryEntity() + StatusHistoryDTO statusHistory = new StatusHistoryDTO() + statusHistory.setAggregateSnapshots(Collections.EMPTY_LIST) + statusHistoryEntity.setStatusHistory(statusHistory) + controllerFacade.getProcessGroupStatusHistory("1234") >> statusHistory + EntityFactory entityFactory = new EntityFactory() + serviceFacade.setEntityFactory(entityFactory) + entityFactory.createStatusHistoryEntity(statusHistoryEntity, permissions) >> statusHistoryEntity + serviceFacade.getProcessGroupStatusHistory("1234") >> statusHistory + // setting up connections (empty list for testing) Set connections = new HashSet() StandardFlowManager flowManager = Mock() diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java new file mode 100644 index 0000000000..c0899cef63 --- /dev/null +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusMetricsUtil.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.prometheus; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.prometheus.util.AbstractMetricsRegistry; +import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry; +import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; +import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.nifi.util.StringUtils.EMPTY; +import static org.hamcrest.CoreMatchers.everyItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestPrometheusMetricsUtil { + private static final long DEFAULT_PREDICTION_VALUE = -1L; + private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0; + private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0; + private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0; + private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0; + private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0; + private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0; + private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0; + private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0; + private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes"; + private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count"; + private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure"; + private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure"; + private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction"; + private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction"; + private static final String CONNECTION_1 = "Connection1"; + private static final String CONNECTION_2 = "Connection2"; + private static final String CONNECTION_3 = "Connection3"; + private static final String CONNECTION_4 = "Connection4"; + private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis"; + private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis"; + + private static ProcessGroupStatus singleProcessGroupStatus; + private static ProcessGroupStatus nestedProcessGroupStatus; + private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure; + private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure; + private static Set connections; + private static Map> mixedValuedPredictions; + private static Map> defaultValuedPredictions; + + @BeforeAll + public static void setup() { + singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1); + nestedProcessGroupStatus = createNestedProcessGroupStatus(); + singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1); + nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure(); + connections = createConnections(); + mixedValuedPredictions = createPredictionsWithMixedValue(); + defaultValuedPredictions = createPredictionsWithDefaultValuesOnly(); + } + + @Test + public void testAggregatePercentUsedWithSingleProcessGroup() { + final Map aggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics); + + assertEquals(4, aggregatedMetrics.size()); + assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)); + assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)); + } + + @Test + public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() { + final Map aggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics); + + assertEquals(4, aggregatedMetrics.size()); + assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)); + assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)); + assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)); + } + + @Test + public void testAggregatePercentUsedWithNestedProcessGroups() { + final Map aggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics); + + assertEquals(4, aggregatedMetrics.size()); + assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)); + assertEquals(EXPECTED_NESTED_COUNT_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)); + } + + @Test + public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() { + final Map aggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics); + + assertEquals(4, aggregatedMetrics.size()); + assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES)); + assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT)); + assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE)); + assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE)); + } + + @Test + public void testAggregateConnectionPredictionsWithMixedValues() { + Map aggregatedMetrics = new HashMap<>(); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions); + + assertEquals(2, aggregatedMetrics.size()); + assertEquals(1.0, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)); + assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)); + } + + @Test + public void testAggregateConnectionPredictionsWithAllDefaultValues() { + Map aggregatedMetrics = new HashMap<>(); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions); + + assertEquals(2, aggregatedMetrics.size()); + assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)); + assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)); + } + + @Test + public void testAggregateConnectionPredictionsWithBackpressure() { + Map aggregatedMetrics = new HashMap<>(); + aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0); + aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions); + + assertEquals(EXPECTED_BACKPRESSURE_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION)); + assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION)); + } + + @Test + public void testAggregatedConnectionPredictionsDatapointCreationWithAnalyticsNotSet() { + ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry(); + Map emptyAggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, + emptyAggregatedMetrics, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry); + + assertTrue(emptyAggregatedMetrics.isEmpty()); + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PREDICTION_VALUE))); + } + + @Test + public void testAggregatedConnectionPredictionsDatapointCreationWithAllDefaultValues() { + ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry(); + Map aggregatedMetrics = new HashMap<>(); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions); + + PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, + aggregatedMetrics, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry); + + assertEquals(2, aggregatedMetrics.size()); + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PREDICTION_VALUE))); + } + + @Test + public void testAggregatedConnectionPredictionsDatapointCreationWithMixedValues() { + ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry(); + Map aggregatedMetrics = new HashMap<>(); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions); + + PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, + aggregatedMetrics, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry); + + assertEquals(2, aggregatedMetrics.size()); + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, hasItems(1.0, 2.0)); + } + + @Test + public void testAggregatedConnectionPredictionsDatapointCreationWithBackpressure() { + ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry(); + Map aggregatedMetrics = new HashMap<>(); + aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0); + aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0); + generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions); + + PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, + aggregatedMetrics, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry); + + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, hasItems(0.0, 2.0)); + } + + @Test + public void testAggregatedNifiMetricsDatapointCreationWithoutResults() { + NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry(); + Map emptyAggregatedMetrics = new HashMap<>(); + + PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry, + emptyAggregatedMetrics, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(niFiMetricsRegistry); + + assertTrue(emptyAggregatedMetrics.isEmpty()); + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PERCENT_USED_VALUE))); + } + + @Test + public void testAggregatedNifiMetricsDatapointCreationWithSingleProcessGroup() { + NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry(); + Map result = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, result); + PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry, + result, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(niFiMetricsRegistry); + + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PERCENT_USED_VALUE))); + } + + @Test + public void testAggregatedNifiMetricsDatapointCreationWithNestedProcessGroup() { + NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry(); + Map result = new HashMap<>(); + + PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, result); + PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry, + result, + EMPTY, + EMPTY, + EMPTY, + EMPTY); + + List sampleValues = getSampleValuesList(niFiMetricsRegistry); + + assertEquals(2, sampleValues.size()); + assertThat(sampleValues, hasItems(EXPECTED_NESTED_BYTES_PERCENT_VALUE, EXPECTED_NESTED_COUNT_PERCENT_VALUE)); + } + + private static ProcessGroupStatus createSingleProcessGroupStatus(final long queuedBytes, final long bytesThreshold, final int queuedCount, final long objectThreshold) { + ProcessGroupStatus singleStatus = new ProcessGroupStatus(); + List connectionStatuses = new ArrayList<>(); + ConnectionStatus connectionStatus = new ConnectionStatus(); + + connectionStatus.setQueuedBytes(queuedBytes); + connectionStatus.setBackPressureBytesThreshold(bytesThreshold); + connectionStatus.setQueuedCount(queuedCount); + connectionStatus.setBackPressureObjectThreshold(objectThreshold); + connectionStatuses.add(connectionStatus); + singleStatus.setConnectionStatus(connectionStatuses); + + return singleStatus; + } + + private static ProcessGroupStatus createNestedProcessGroupStatus() { + ProcessGroupStatus rootStatus = new ProcessGroupStatus(); + ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 10, 200); + ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30); + + status1.setProcessGroupStatus(Collections.singletonList(status2)); + rootStatus.setProcessGroupStatus(Collections.singletonList(status1)); + + return rootStatus; + } + + private static ProcessGroupStatus createNestedProcessGroupStatusWithCountBackpressure() { + ProcessGroupStatus rootStatus = new ProcessGroupStatus(); + ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 1, 1); + ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30); + + status1.setProcessGroupStatus(Collections.singletonList(status2)); + rootStatus.setProcessGroupStatus(Collections.singletonList(status1)); + + return rootStatus; + } + + private static Map> createPredictionsWithMixedValue() { + Map> predictions = new HashMap<>(); + + predictions.put(CONNECTION_1, new HashMap() {{ + put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, Long.MAX_VALUE); + put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, Long.MAX_VALUE); + }}); + predictions.put(CONNECTION_2, new HashMap() {{ + put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE); + put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE); + }}); + predictions.put(CONNECTION_3, new HashMap() {{ + put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 1L); + put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 4L); + }}); + predictions.put(CONNECTION_4, new HashMap() {{ + put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 3L); + put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 2L); + }}); + return predictions; + } + + private static Map> createPredictionsWithDefaultValuesOnly() { + Map> predictions = new HashMap<>(); + Map defaultPredictions = new HashMap() {{ + put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE); + put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE); + }}; + + predictions.put(CONNECTION_1, defaultPredictions); + predictions.put(CONNECTION_2, defaultPredictions); + predictions.put(CONNECTION_3, defaultPredictions); + predictions.put(CONNECTION_4, defaultPredictions); + return predictions; + } + + private static Set createConnections() { + Set connections = new HashSet<>(); + connections.add(CONNECTION_1); + connections.add(CONNECTION_2); + connections.add(CONNECTION_3); + connections.add(CONNECTION_4); + return connections; + } + + private Map getPredictions(final Map> predictions, final String connection) { + return predictions.get(connection); + } + + private List getSampleValuesList(final AbstractMetricsRegistry metricsRegistry) { + return Collections.list(metricsRegistry.getRegistry().metricFamilySamples()) + .stream() + .flatMap(familySamples -> familySamples.samples.stream()) + .map(sample -> sample.value) + .collect(Collectors.toList()); + } + + private void generateConnectionAnalyticMetricsAggregation(final Map aggregatedMetrics, final Map> predictions) { + for (final String connection : connections) { + PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, getPredictions(predictions, connection)); + } + } +}