NIFI-9455 Added aggregated predictions to Prometheus Flow Metrics

This closes #5582

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Timea Barna 2021-11-02 13:05:45 +01:00 committed by exceptionfactory
parent 3ea9faccc6
commit 3ccc9d29b6
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 631 additions and 9 deletions

View File

@ -153,6 +153,12 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("TOTAL_TASK_DURATION", Gauge.build()
.name("nifi_total_task_duration")
.help("The total number of thread-milliseconds that the component has used to complete its tasks in the past 5 minutes")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
// Processor metrics
nameToGaugeMap.put("PROCESSOR_COUNTERS", Gauge.build()
.name("nifi_processor_counters")

View File

@ -49,6 +49,8 @@ public class PrometheusMetricsUtil {
private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
protected static final String DEFAULT_LABEL_STRING = "";
private static final double MAXIMUM_BACKPRESSURE = 1.0;
private static final double UNDEFINED_BACKPRESSURE = -1.0;
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
@ -174,6 +176,7 @@ public class PrometheusMetricsUtil {
instanceId, procComponentType, procComponentName, procComponentId, parentId);
}
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = StringUtils.isEmpty(connectionStatus.getId()) ? DEFAULT_LABEL_STRING : connectionStatus.getId();
final String connComponentName = StringUtils.isEmpty(connectionStatus.getName()) ? DEFAULT_LABEL_STRING : connectionStatus.getName();
@ -212,6 +215,7 @@ public class PrometheusMetricsUtil {
nifiMetricsRegistry.setDataPoint(isBackpressureEnabled ? 1 : 0, "IS_BACKPRESSURE_ENABLED",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
}
for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
@ -312,9 +316,10 @@ public class PrometheusMetricsUtil {
return jvmMetricsRegistry.getRegistry();
}
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, StatusAnalytics statusAnalytics,
String instId, String connComponentType, String connName,
String connId, String pgId, String srcId, String srcName, String destId, String destName) {
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, final StatusAnalytics statusAnalytics,
final String instId, final String connComponentType, final String connName, final String connId,
final String pgId, final String srcId, final String srcName, final String destId, final String destName) {
if(statusAnalytics != null) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String connComponentId = StringUtils.isEmpty(connId) ? DEFAULT_LABEL_STRING : connId;
@ -362,4 +367,149 @@ public class PrometheusMetricsUtil {
bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
return bulletinMetricsRegistry.getRegistry();
}
public static void aggregatePercentUsed(final ProcessGroupStatus status, final Map<String, Double> aggregatedMetrics) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> aggregatePercentUsed(childGroupStatus, aggregatedMetrics));
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final double percentUsedBytes = getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold());
final double percentUsedCount = getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold());
determineMaxValueForPercentUsed(aggregatedMetrics,
"nifi_percent_used_bytes",
percentUsedBytes);
determineMaxValueForPercentUsed(aggregatedMetrics,
"nifi_percent_used_count",
percentUsedCount);
setBackpressure(aggregatedMetrics, percentUsedBytes, "bytesAtBackpressure");
setBackpressure(aggregatedMetrics, percentUsedCount, "countAtBackpressure");
}
}
public static void aggregateConnectionPredictionMetrics(final Map<String, Double> aggregatedMetrics, final Map<String, Long> predictions) {
determineMinValueForPredictions(aggregatedMetrics,
"nifi_time_to_bytes_backpressure_prediction",
predictions.get("timeToBytesBackpressureMillis"),
"bytesAtBackpressure");
determineMinValueForPredictions(aggregatedMetrics,
"nifi_time_to_count_backpressure_prediction",
predictions.get("timeToCountBackpressureMillis"),
"countAtBackpressure");
}
private static void setBackpressure(final Map<String, Double> aggregatedMetrics, final double percentUsed, final String atBackpressureKey) {
if (percentUsed >= 100) {
aggregatedMetrics.put(atBackpressureKey, MAXIMUM_BACKPRESSURE);
} else if (!aggregatedMetrics.containsKey(atBackpressureKey)) {
aggregatedMetrics.put(atBackpressureKey, 0.0);
}
}
private static void determineMinValueForPredictions(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName,
final double metricSampleValue, final String atBackpressureKey) {
final Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
if (aggregatedMetrics.get(atBackpressureKey) != null && aggregatedMetrics.get(atBackpressureKey) == MAXIMUM_BACKPRESSURE) {
aggregatedMetrics.put(metricFamilySamplesName, 0.0);
} else if (currentValue == null) {
aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
} else if (metricSampleValue > -1) {
if (currentValue == -1) {
aggregatedMetrics.put(metricFamilySamplesName, metricSampleValue);
} else {
aggregatedMetrics.put(metricFamilySamplesName, Math.min(metricSampleValue, currentValue));
}
}
}
private static void determineMaxValueForPercentUsed(final Map<String, Double> aggregatedMetrics, final String metricFamilySamplesName, final double metricSampleValue) {
Double currentValue = aggregatedMetrics.get(metricFamilySamplesName);
if (currentValue == null) {
currentValue = 0.0;
}
aggregatedMetrics.put(metricFamilySamplesName, Math.max(metricSampleValue, currentValue));
}
public static CollectorRegistry createAggregatedConnectionStatusAnalyticsMetrics(final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry,
final Map<String, Double> aggregatedMetrics,
final String instId, final String compType, final String compName, final String compId) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName;
final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
final Double bytesValue = aggregatedMetrics.get("nifi_time_to_bytes_backpressure_prediction");
final Double countsValue = aggregatedMetrics.get("nifi_time_to_count_backpressure_prediction");
final double bytesBackpressure = bytesValue == null ? UNDEFINED_BACKPRESSURE : bytesValue;
final double countsBackpressure = countsValue == null ? UNDEFINED_BACKPRESSURE : countsValue;
connectionAnalyticsMetricsRegistry.setDataPoint(bytesBackpressure,
"TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
instanceId,
componentType,
componentName,
componentId,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING);
connectionAnalyticsMetricsRegistry.setDataPoint(countsBackpressure,
"TIME_TO_COUNT_BACKPRESSURE_PREDICTION",
instanceId,
componentType,
componentName,
componentId,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING);
return connectionAnalyticsMetricsRegistry.getRegistry();
}
public static CollectorRegistry createAggregatedNifiMetrics(final NiFiMetricsRegistry niFiMetricsRegistry,
final Map<String, Double> aggregatedMetrics,
final String instId, final String compType, final String compName, final String compId) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
final String componentName = StringUtils.isEmpty(compName) ? DEFAULT_LABEL_STRING : compName;
final String componentId = StringUtils.isEmpty(compId) ? DEFAULT_LABEL_STRING : compId;
final Double bytesValue = aggregatedMetrics.get("nifi_percent_used_bytes");
final Double countsValue = aggregatedMetrics.get("nifi_percent_used_count");
final double percentBytes = bytesValue == null ? 0.0 : bytesValue;
final double percentCount = countsValue == null ? 0.0 : countsValue;
niFiMetricsRegistry.setDataPoint(percentBytes,
"PERCENT_USED_BYTES",
instanceId,
componentType,
componentName,
componentId,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING);
niFiMetricsRegistry.setDataPoint(percentCount,
"PERCENT_USED_COUNT",
instanceId,
componentType,
componentName,
componentId,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING,
DEFAULT_LABEL_STRING);
return niFiMetricsRegistry.getRegistry();
}
}

View File

@ -94,6 +94,8 @@ import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -253,6 +255,7 @@ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
@ -374,6 +377,7 @@ import java.util.stream.Stream;
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
private static final int VALIDATION_WAIT_MILLIS = 50;
private static final String ROOT_PROCESS_GROUP = "RootProcessGroup";
// nifi core components
private ControllerFacade controllerFacade;
@ -5612,7 +5616,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
nifiMetricsRegistry.clear();
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", ROOT_PROCESS_GROUP,
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
// Add the total byte counts (read/written) to the NiFi metrics registry
@ -5621,22 +5625,42 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final String rootPGName = StringUtils.isEmpty(rootPGStatus.getName()) ? "" : rootPGStatus.getName();
final FlowFileEvent aggregateEvent = flowFileEventRepository.reportAggregateEvent();
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesRead(), "TOTAL_BYTES_READ",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesWritten(), "TOTAL_BYTES_WRITTEN",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesSent(), "TOTAL_BYTES_SENT",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
//Add total task duration for root to the NiFi metrics registry
// The latest aggregated status history is the last element in the list so we need the last element only
final StatusHistoryEntity rootGPStatusHistory = getProcessGroupStatusHistory(rootPGId);
final List<StatusSnapshotDTO> aggregatedStatusHistory = rootGPStatusHistory.getStatusHistory().getAggregateSnapshots();
final int lastIndex = aggregatedStatusHistory.size() -1;
final String taskDurationInMillis = ProcessGroupStatusDescriptor.TASK_MILLIS.getField();
long taskDuration = 0;
if (!aggregatedStatusHistory.isEmpty()) {
final StatusSnapshotDTO latestStatusHistory = aggregatedStatusHistory.get(lastIndex);
taskDuration = latestStatusHistory.getStatusMetrics().get(taskDurationInMillis);
}
nifiMetricsRegistry.setDataPoint(taskDuration, "TOTAL_TASK_DURATION",
instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId, "");
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(rootPGStatus, aggregatedMetrics);
PrometheusMetricsUtil.createAggregatedNifiMetrics(nifiMetricsRegistry, aggregatedMetrics, instanceId,ROOT_PROCESS_GROUP, rootPGName, rootPGId);
// Get Connection Status Analytics (predictions, e.g.)
Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
for (Connection c : connections) {
// If a ResourceNotFoundException is thrown, analytics hasn't been enabled
try {
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
final StatusAnalytics statusAnalytics = controllerFacade.getConnectionStatusAnalytics(c.getIdentifier());
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
statusAnalytics,
instanceId,
"Connection",
c.getName(),
@ -5647,10 +5671,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
c.getDestination().getName(),
c.getDestination().getIdentifier()
);
PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, statusAnalytics.getPredictions());
} catch (ResourceNotFoundException rnfe) {
break;
}
}
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, aggregatedMetrics, instanceId, ROOT_PROCESS_GROUP, rootPGName, rootPGId);
// Create a query to get all bulletins
final BulletinQueryDTO query = new BulletinQueryDTO();

View File

@ -32,7 +32,12 @@ import org.apache.nifi.authorization.resource.ResourceFactory
import org.apache.nifi.authorization.user.NiFiUser
import org.apache.nifi.authorization.user.NiFiUserDetails
import org.apache.nifi.authorization.user.StandardNiFiUser
import org.apache.nifi.components.state.StateManagerProvider
import org.apache.nifi.connectable.Connection
import org.apache.nifi.controller.NodeTypeProvider
import org.apache.nifi.controller.ProcessScheduler
import org.apache.nifi.controller.ReloadComponent
import org.apache.nifi.controller.flow.FlowManager
import org.apache.nifi.controller.flow.StandardFlowManager
import org.apache.nifi.controller.repository.FlowFileEvent
import org.apache.nifi.controller.repository.FlowFileEventRepository
@ -42,20 +47,31 @@ import org.apache.nifi.controller.status.ProcessGroupStatus
import org.apache.nifi.controller.status.RunStatus
import org.apache.nifi.diagnostics.StorageUsage
import org.apache.nifi.diagnostics.SystemDiagnostics
import org.apache.nifi.encrypt.PropertyEncryptor
import org.apache.nifi.groups.ProcessGroup
import org.apache.nifi.groups.StandardProcessGroup
import org.apache.nifi.nar.ExtensionManager
import org.apache.nifi.registry.flow.FlowRegistryClient
import org.apache.nifi.registry.variable.MutableVariableRegistry
import org.apache.nifi.reporting.Bulletin
import org.apache.nifi.reporting.BulletinRepository
import org.apache.nifi.util.MockBulletinRepository
import org.apache.nifi.util.NiFiProperties
import org.apache.nifi.web.api.dto.AccessPolicyDTO
import org.apache.nifi.web.api.dto.BulletinDTO
import org.apache.nifi.web.api.dto.DtoFactory
import org.apache.nifi.web.api.dto.EntityFactory
import org.apache.nifi.web.api.dto.PermissionsDTO
import org.apache.nifi.web.api.dto.RevisionDTO
import org.apache.nifi.web.api.dto.UserDTO
import org.apache.nifi.web.api.dto.UserGroupDTO
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO
import org.apache.nifi.web.api.entity.BulletinEntity
import org.apache.nifi.web.api.entity.StatusHistoryEntity
import org.apache.nifi.web.api.entity.UserEntity
import org.apache.nifi.web.controller.ControllerFacade
import org.apache.nifi.web.dao.AccessPolicyDAO
import org.apache.nifi.web.dao.ProcessGroupDAO
import org.apache.nifi.web.dao.UserDAO
import org.apache.nifi.web.dao.UserGroupDAO
import org.apache.nifi.web.revision.DeleteRevisionTask
@ -1008,6 +1024,24 @@ class StandardNiFiServiceFacadeSpec extends Specification {
FlowFileEvent aggregateEvent = Mock()
flowFileEventRepository.reportAggregateEvent() >> aggregateEvent
ProcessGroupDAO processGroupDAO = Mock()
serviceFacade.setProcessGroupDAO(processGroupDAO)
ProcessGroup processGroup = Mock()
processGroupDAO.getProcessGroup(rootGroupStatus.getId()) >> processGroup
DtoFactory dtoFactory = new DtoFactory()
serviceFacade.setDtoFactory(dtoFactory)
PermissionsDTO permissions = Mock()
dtoFactory.createPermissionsDto(processGroup) >> permissions
StatusHistoryEntity statusHistoryEntity = new StatusHistoryEntity()
StatusHistoryDTO statusHistory = new StatusHistoryDTO()
statusHistory.setAggregateSnapshots(Collections.EMPTY_LIST)
statusHistoryEntity.setStatusHistory(statusHistory)
controllerFacade.getProcessGroupStatusHistory("1234") >> statusHistory
EntityFactory entityFactory = new EntityFactory()
serviceFacade.setEntityFactory(entityFactory)
entityFactory.createStatusHistoryEntity(statusHistoryEntity, permissions) >> statusHistoryEntity
serviceFacade.getProcessGroupStatusHistory("1234") >> statusHistory
// setting up connections (empty list for testing)
Set<Connection> connections = new HashSet()
StandardFlowManager flowManager = Mock()

View File

@ -0,0 +1,406 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.util.StringUtils.EMPTY;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestPrometheusMetricsUtil {
private static final long DEFAULT_PREDICTION_VALUE = -1L;
private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
private static final String CONNECTION_1 = "Connection1";
private static final String CONNECTION_2 = "Connection2";
private static final String CONNECTION_3 = "Connection3";
private static final String CONNECTION_4 = "Connection4";
private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
private static ProcessGroupStatus singleProcessGroupStatus;
private static ProcessGroupStatus nestedProcessGroupStatus;
private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
private static Set<String> connections;
private static Map<String, Map<String, Long>> mixedValuedPredictions;
private static Map<String, Map<String, Long>> defaultValuedPredictions;
@BeforeAll
public static void setup() {
singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
nestedProcessGroupStatus = createNestedProcessGroupStatus();
singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
connections = createConnections();
mixedValuedPredictions = createPredictionsWithMixedValue();
defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
}
@Test
public void testAggregatePercentUsedWithSingleProcessGroup() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithNestedProcessGroups() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_NESTED_COUNT_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregateConnectionPredictionsWithMixedValues() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
assertEquals(2, aggregatedMetrics.size());
assertEquals(1.0, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregateConnectionPredictionsWithAllDefaultValues() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
assertEquals(2, aggregatedMetrics.size());
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregateConnectionPredictionsWithBackpressure() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
assertEquals(EXPECTED_BACKPRESSURE_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithAnalyticsNotSet() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
emptyAggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertTrue(emptyAggregatedMetrics.isEmpty());
assertEquals(2, sampleValues.size());
assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PREDICTION_VALUE)));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithAllDefaultValues() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, aggregatedMetrics.size());
assertEquals(2, sampleValues.size());
assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PREDICTION_VALUE)));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithMixedValues() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, aggregatedMetrics.size());
assertEquals(2, sampleValues.size());
assertThat(sampleValues, hasItems(1.0, 2.0));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithBackpressure() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, sampleValues.size());
assertThat(sampleValues, hasItems(0.0, 2.0));
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithoutResults() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
emptyAggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertTrue(emptyAggregatedMetrics.isEmpty());
assertEquals(2, sampleValues.size());
assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PERCENT_USED_VALUE)));
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithSingleProcessGroup() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> result = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, result);
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
result,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertEquals(2, sampleValues.size());
assertThat(sampleValues, everyItem(is(EXPECTED_DEFAULT_PERCENT_USED_VALUE)));
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithNestedProcessGroup() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> result = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, result);
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
result,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertEquals(2, sampleValues.size());
assertThat(sampleValues, hasItems(EXPECTED_NESTED_BYTES_PERCENT_VALUE, EXPECTED_NESTED_COUNT_PERCENT_VALUE));
}
private static ProcessGroupStatus createSingleProcessGroupStatus(final long queuedBytes, final long bytesThreshold, final int queuedCount, final long objectThreshold) {
ProcessGroupStatus singleStatus = new ProcessGroupStatus();
List<ConnectionStatus> connectionStatuses = new ArrayList<>();
ConnectionStatus connectionStatus = new ConnectionStatus();
connectionStatus.setQueuedBytes(queuedBytes);
connectionStatus.setBackPressureBytesThreshold(bytesThreshold);
connectionStatus.setQueuedCount(queuedCount);
connectionStatus.setBackPressureObjectThreshold(objectThreshold);
connectionStatuses.add(connectionStatus);
singleStatus.setConnectionStatus(connectionStatuses);
return singleStatus;
}
private static ProcessGroupStatus createNestedProcessGroupStatus() {
ProcessGroupStatus rootStatus = new ProcessGroupStatus();
ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 10, 200);
ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30);
status1.setProcessGroupStatus(Collections.singletonList(status2));
rootStatus.setProcessGroupStatus(Collections.singletonList(status1));
return rootStatus;
}
private static ProcessGroupStatus createNestedProcessGroupStatusWithCountBackpressure() {
ProcessGroupStatus rootStatus = new ProcessGroupStatus();
ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 1, 1);
ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30);
status1.setProcessGroupStatus(Collections.singletonList(status2));
rootStatus.setProcessGroupStatus(Collections.singletonList(status1));
return rootStatus;
}
private static Map<String, Map<String, Long>> createPredictionsWithMixedValue() {
Map<String, Map<String, Long>> predictions = new HashMap<>();
predictions.put(CONNECTION_1, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, Long.MAX_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, Long.MAX_VALUE);
}});
predictions.put(CONNECTION_2, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
}});
predictions.put(CONNECTION_3, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 1L);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 4L);
}});
predictions.put(CONNECTION_4, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 3L);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 2L);
}});
return predictions;
}
private static Map<String, Map<String, Long>> createPredictionsWithDefaultValuesOnly() {
Map<String, Map<String, Long>> predictions = new HashMap<>();
Map<String, Long> defaultPredictions = new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
}};
predictions.put(CONNECTION_1, defaultPredictions);
predictions.put(CONNECTION_2, defaultPredictions);
predictions.put(CONNECTION_3, defaultPredictions);
predictions.put(CONNECTION_4, defaultPredictions);
return predictions;
}
private static Set<String> createConnections() {
Set<String> connections = new HashSet<>();
connections.add(CONNECTION_1);
connections.add(CONNECTION_2);
connections.add(CONNECTION_3);
connections.add(CONNECTION_4);
return connections;
}
private Map<String, Long> getPredictions(final Map<String, Map<String, Long>> predictions, final String connection) {
return predictions.get(connection);
}
private List<Double> getSampleValuesList(final AbstractMetricsRegistry metricsRegistry) {
return Collections.list(metricsRegistry.getRegistry().metricFamilySamples())
.stream()
.flatMap(familySamples -> familySamples.samples.stream())
.map(sample -> sample.value)
.collect(Collectors.toList());
}
private void generateConnectionAnalyticMetricsAggregation(final Map<String, Double> aggregatedMetrics, final Map<String, Map<String, Long>> predictions) {
for (final String connection : connections) {
PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, getPredictions(predictions, connection));
}
}
}