mirror of https://github.com/apache/nifi.git
NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping (#4156)
* NIFI-7273: Add flow metrics REST endpoint with for Prometheus scraping * NIFI-7273: Changed method name, fix handling when analytics not enabled * NIFI-7273: Removed attachment header from Prometheus metrics endpoint * NIFI-7273: Removed unused variable
This commit is contained in:
parent
c0f5fcb484
commit
a093af2d42
|
@ -0,0 +1,94 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-extension-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-prometheus-utils</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<description>
|
||||
This nifi-prometheus-utils module is designed to capture common patterns
|
||||
and utilities that can be leveraged by components that use Prometheus capabilities to
|
||||
help promote reuse. These patterns may become framework level features
|
||||
or may simply be made available through this utility. It is ok for this
|
||||
module to have dependencies but care should be taken when adding dependencies
|
||||
as this increases the cost of utilizing this module in various nars.
|
||||
</description>
|
||||
<properties>
|
||||
<prometheus.version>0.3.0</prometheus.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<!-- The client -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
</dependency>
|
||||
<!-- Hotspot JVM metrics -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_hotspot</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
</dependency>
|
||||
<!-- Exposition servlet -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_servlet</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
<!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
|
||||
allow NiFi to be compiled on those JDKs. -->
|
||||
<id>jigsaw</id>
|
||||
<activation>
|
||||
<jdk>(1.8,)</jdk>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>javax.xml.bind</groupId>
|
||||
<artifactId>jaxb-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.xml.bind</groupId>
|
||||
<artifactId>jaxb-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.xml.bind</groupId>
|
||||
<artifactId>jaxb-impl</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -15,13 +15,16 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.reporting.prometheus.api;
|
||||
package org.apache.nifi.prometheus.util;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.SimpleCollector;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -34,11 +37,11 @@ import io.prometheus.client.CollectorRegistry;
|
|||
import io.prometheus.client.Gauge;
|
||||
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.TransmissionStatus;
|
||||
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.metrics.jvm.JvmMetrics;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
|
||||
public class PrometheusMetricsUtil {
|
||||
|
||||
|
@ -51,6 +54,10 @@ public class PrometheusMetricsUtil {
|
|||
|
||||
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
|
||||
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
|
||||
private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry();
|
||||
private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
|
||||
|
||||
public static final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
|
||||
|
||||
// Common properties/values
|
||||
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
|
||||
|
@ -81,15 +88,6 @@ public class PrometheusMetricsUtil {
|
|||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
|
||||
.name("prometheus-reporting-task-ssl-context")
|
||||
.displayName("SSL Context Service")
|
||||
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
|
||||
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
|
||||
.name("prometheus-reporting-task-client-auth")
|
||||
.displayName("Client Authentication")
|
||||
|
@ -137,6 +135,16 @@ public class PrometheusMetricsUtil {
|
|||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
||||
.register(NIFI_REGISTRY);
|
||||
|
||||
private static final Counter TOTAL_BYTES_READ = Counter.build().name("nifi_total_bytes_read")
|
||||
.help("Total number of bytes read by the component")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
||||
.register(NIFI_REGISTRY);
|
||||
|
||||
private static final Counter TOTAL_BYTES_WRITTEN = Counter.build().name("nifi_total_bytes_written")
|
||||
.help("Total number of bytes written by the component")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
|
||||
.register(NIFI_REGISTRY);
|
||||
|
||||
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
|
||||
.name("nifi_amount_bytes_written")
|
||||
.help("Total number of bytes written by the component")
|
||||
|
@ -267,6 +275,42 @@ public class PrometheusMetricsUtil {
|
|||
"source_id", "source_name", "destination_id", "destination_name")
|
||||
.register(NIFI_REGISTRY);
|
||||
|
||||
// Connection status analytics metrics
|
||||
private static final Gauge TIME_TO_BYTES_BACKPRESSURE_PREDICTION = Gauge.build()
|
||||
.name("nifi_time_to_bytes_backpressure_prediction")
|
||||
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to bytes in the queue")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
|
||||
"source_id", "source_name", "destination_id", "destination_name")
|
||||
.register(CONNECTION_ANALYTICS_REGISTRY);
|
||||
|
||||
private static final Gauge TIME_TO_COUNT_BACKPRESSURE_PREDICTION = Gauge.build()
|
||||
.name("nifi_time_to_count_backpressure_prediction")
|
||||
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to number of objects in the queue")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
|
||||
"source_id", "source_name", "destination_id", "destination_name")
|
||||
.register(CONNECTION_ANALYTICS_REGISTRY);
|
||||
|
||||
private static final Gauge BYTES_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
|
||||
.name("nifi_bytes_at_next_interval_prediction")
|
||||
.help("Predicted number of bytes in the queue at the next configured interval")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
|
||||
"source_id", "source_name", "destination_id", "destination_name")
|
||||
.register(CONNECTION_ANALYTICS_REGISTRY);
|
||||
|
||||
private static final Gauge COUNT_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
|
||||
.name("nifi_count_at_next_interval_prediction")
|
||||
.help("Predicted number of objects in the queue at the next configured interval")
|
||||
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
|
||||
"source_id", "source_name", "destination_id", "destination_name")
|
||||
.register(CONNECTION_ANALYTICS_REGISTRY);
|
||||
|
||||
private static final Gauge BULLETIN = Gauge.build()
|
||||
.name("nifi_bulletin")
|
||||
.help("Bulletin reported by the NiFi instance")
|
||||
.labelNames("instance", "component_type", "component_id", "parent_id",
|
||||
"node_address", "category", "source_name", "source_id", "level")
|
||||
.register(BULLETIN_REGISTRY);
|
||||
|
||||
///////////////////////////////////////////////////////////////
|
||||
// JVM Metrics
|
||||
///////////////////////////////////////////////////////////////
|
||||
|
@ -350,6 +394,8 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
|
||||
AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
|
||||
AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
|
||||
TOTAL_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesRead());
|
||||
TOTAL_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesWritten());
|
||||
AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
|
||||
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
|
||||
|
||||
|
@ -399,6 +445,8 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
|
||||
AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
|
||||
AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
|
||||
TOTAL_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesRead());
|
||||
TOTAL_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesWritten());
|
||||
AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
|
||||
|
||||
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
|
||||
|
@ -463,6 +511,8 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
|
||||
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
|
||||
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
|
||||
TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
|
||||
TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
|
||||
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
|
||||
|
||||
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
|
||||
|
@ -487,6 +537,8 @@ public class PrometheusMetricsUtil {
|
|||
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
|
||||
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
|
||||
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
|
||||
TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead());
|
||||
TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten());
|
||||
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
|
||||
|
||||
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
|
||||
|
@ -552,4 +604,27 @@ public class PrometheusMetricsUtil {
|
|||
return JVM_REGISTRY;
|
||||
}
|
||||
|
||||
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instanceId, String connComponentType, String connComponentName,
|
||||
String connComponentId, String parentId, String sourceId, String sourceName, String destinationId, String destinationName) {
|
||||
if(statusAnalytics != null) {
|
||||
Map<String, Long> predictions = statusAnalytics.getPredictions();
|
||||
TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(predictions.get("timeToBytesBackpressureMillis"));
|
||||
TIME_TO_COUNT_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(predictions.get("timeToCountBackpressureMillis"));
|
||||
BYTES_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(predictions.get("nextIntervalBytes"));
|
||||
COUNT_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
|
||||
.set(predictions.get("nextIntervalCount"));
|
||||
}
|
||||
|
||||
return CONNECTION_ANALYTICS_REGISTRY;
|
||||
}
|
||||
|
||||
public static CollectorRegistry createBulletinMetrics(String instanceId, String componentType, String componentId, String parentId, String nodeAddress,
|
||||
String category, String sourceName, String sourceId, String level) {
|
||||
|
||||
BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1);
|
||||
return BULLETIN_REGISTRY;
|
||||
}
|
||||
}
|
|
@ -35,6 +35,7 @@
|
|||
<module>nifi-database-utils</module>
|
||||
<module>nifi-database-test-utils</module>
|
||||
<module>nifi-service-utils</module>
|
||||
<module>nifi-prometheus-utils</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -256,6 +256,11 @@
|
|||
<artifactId>nifi-authorizer</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-prometheus-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
|
|
|
@ -314,6 +314,11 @@ public interface NiFiServiceFacade {
|
|||
*/
|
||||
FlowConfigurationEntity getFlowConfiguration();
|
||||
|
||||
/**
|
||||
* Gets the metrics for the flow.
|
||||
*/
|
||||
void generateFlowMetrics();
|
||||
|
||||
/**
|
||||
* Updates the configuration for this controller.
|
||||
*
|
||||
|
|
|
@ -94,12 +94,14 @@ import org.apache.nifi.groups.RemoteProcessGroup;
|
|||
import org.apache.nifi.history.History;
|
||||
import org.apache.nifi.history.HistoryQuery;
|
||||
import org.apache.nifi.history.PreviousValue;
|
||||
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
|
||||
import org.apache.nifi.nar.ExtensionManager;
|
||||
import org.apache.nifi.parameter.Parameter;
|
||||
import org.apache.nifi.parameter.ParameterContext;
|
||||
import org.apache.nifi.parameter.ParameterDescriptor;
|
||||
import org.apache.nifi.parameter.ParameterReferenceManager;
|
||||
import org.apache.nifi.parameter.StandardParameterContext;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.registry.ComponentVariableRegistry;
|
||||
import org.apache.nifi.registry.authorization.Permissions;
|
||||
import org.apache.nifi.registry.bucket.Bucket;
|
||||
|
@ -5294,6 +5296,56 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void generateFlowMetrics() {
|
||||
|
||||
String instanceId = controllerFacade.getInstanceId();
|
||||
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
|
||||
PrometheusMetricsUtil.createNifiMetrics(rootPGStatus, instanceId, "", "RootProcessGroup",
|
||||
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
|
||||
PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId);
|
||||
|
||||
// Get Connection Status Analytics (predictions, e.g.)
|
||||
Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
|
||||
for (Connection c : connections) {
|
||||
// If a ResourceNotFoundException is thrown, analytics hasn't been enabled
|
||||
try {
|
||||
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
|
||||
instanceId,
|
||||
"Connection",
|
||||
c.getName(),
|
||||
c.getIdentifier(),
|
||||
c.getProcessGroup().getIdentifier(),
|
||||
c.getSource().getName(),
|
||||
c.getSource().getIdentifier(),
|
||||
c.getDestination().getName(),
|
||||
c.getDestination().getIdentifier()
|
||||
);
|
||||
} catch (ResourceNotFoundException rnfe) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Create a query to get all bulletins
|
||||
final BulletinQueryDTO query = new BulletinQueryDTO();
|
||||
BulletinBoardDTO bulletinBoardDTO = getBulletinBoard(query);
|
||||
for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) {
|
||||
BulletinDTO bulletin = bulletinEntity.getBulletin();
|
||||
if(bulletin != null) {
|
||||
PrometheusMetricsUtil.createBulletinMetrics(instanceId,
|
||||
"Bulletin",
|
||||
String.valueOf(bulletin.getId()),
|
||||
bulletin.getGroupId() == null ? "" : bulletin.getGroupId(),
|
||||
bulletin.getNodeAddress() == null ? "" : bulletin.getNodeAddress(),
|
||||
bulletin.getCategory(),
|
||||
bulletin.getSourceName(),
|
||||
bulletin.getSourceId(),
|
||||
bulletin.getLevel()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClustered() {
|
||||
return controllerFacade.isClustered();
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.web.api;
|
||||
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.common.TextFormat;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
|
@ -43,6 +45,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
|
|||
import org.apache.nifi.controller.service.ControllerServiceState;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.nar.NarClassLoadersHolder;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.registry.client.NiFiRegistryException;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.web.IllegalClusterResourceRequestException;
|
||||
|
@ -129,6 +132,10 @@ import javax.ws.rs.WebApplicationException;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.text.Collator;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
|
@ -381,6 +388,64 @@ public class FlowResource extends ApplicationResource {
|
|||
return generateOkResponse(entity).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the metrics of the entire flow.
|
||||
*
|
||||
* @return A flowMetricsEntity.
|
||||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
@GET
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces(MediaType.WILDCARD)
|
||||
@Path("metrics/{producer}")
|
||||
@ApiOperation(
|
||||
value = "Gets all metrics for the flow from a particular node",
|
||||
response = StreamingOutput.class,
|
||||
authorizations = {
|
||||
@Authorization(value = "Read - /flow")
|
||||
}
|
||||
)
|
||||
@ApiResponses(
|
||||
value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response getFlowMetrics(
|
||||
@ApiParam(
|
||||
value = "The producer for flow file metrics. Each producer may have its own output format.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("producer") final String producer) throws InterruptedException {
|
||||
|
||||
authorizeFlow();
|
||||
|
||||
if ("prometheus".equalsIgnoreCase(producer)) {
|
||||
// get this process group flow
|
||||
serviceFacade.generateFlowMetrics();
|
||||
// generate a streaming response
|
||||
final StreamingOutput response = output -> {
|
||||
Writer writer = new BufferedWriter(new OutputStreamWriter(output));
|
||||
for (CollectorRegistry collectorRegistry : PrometheusMetricsUtil.ALL_REGISTRIES) {
|
||||
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
|
||||
// flush the response
|
||||
output.flush();
|
||||
}
|
||||
writer.flush();
|
||||
writer.close();
|
||||
};
|
||||
|
||||
return generateOkResponse(response)
|
||||
.type(MediaType.TEXT_PLAIN_TYPE)
|
||||
.build();
|
||||
} else {
|
||||
throw new ResourceNotFoundException("The specified producer is missing or invalid.");
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------
|
||||
// controller services
|
||||
// -------------------
|
||||
|
|
|
@ -16,28 +16,11 @@
|
|||
<artifactId>nifi-prometheus-bundle</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<properties>
|
||||
<prometheus.version>0.3.0</prometheus.version>
|
||||
</properties>
|
||||
|
||||
<artifactId>nifi-prometheus-reporting-task</artifactId>
|
||||
<description>Prometheus /metrics http endpoint for monitoring</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-sink-api</artifactId>
|
||||
|
@ -53,23 +36,10 @@
|
|||
<artifactId>nifi-record</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<!-- The client -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
</dependency>
|
||||
<!-- Hotspot JVM metrics -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_hotspot</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
</dependency>
|
||||
<!-- Exposition servlet -->
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient_servlet</artifactId>
|
||||
<version>${prometheus.version}</version>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-prometheus-utils</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.nifi.controller.AbstractControllerService;
|
|||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
@ -36,6 +36,7 @@ import org.apache.nifi.serialization.record.RecordField;
|
|||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
|
@ -62,13 +63,22 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
|
|||
private volatile Map<String, Gauge> gauges;
|
||||
private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry();
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
|
||||
.name("prometheus-reporting-task-ssl-context")
|
||||
.displayName("SSL Context Service")
|
||||
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
|
||||
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
|
||||
props.add(PrometheusMetricsUtil.INSTANCE_ID);
|
||||
props.add(PrometheusMetricsUtil.SSL_CONTEXT);
|
||||
props.add(SSL_CONTEXT);
|
||||
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
@ -81,7 +91,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
|
|||
@OnEnabled
|
||||
public void onScheduled(final ConfigurationContext context) {
|
||||
RECORD_REGISTRY.clear();
|
||||
SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
|
||||
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
|
||||
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
|
||||
|
||||
try {
|
||||
|
|
|
@ -36,14 +36,15 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
|
|||
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
|
||||
import org.apache.nifi.reporting.AbstractReportingTask;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
|
||||
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
|
||||
import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
|
||||
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
|
||||
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
|
||||
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
|
||||
|
||||
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
|
||||
@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
|
||||
|
@ -53,6 +54,15 @@ public class PrometheusReportingTask extends AbstractReportingTask {
|
|||
|
||||
private PrometheusServer prometheusServer;
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
|
||||
.name("prometheus-reporting-task-ssl-context")
|
||||
.displayName("SSL Context Service")
|
||||
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
|
||||
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("prometheus-reporting-task-metrics-strategy")
|
||||
.displayName("Metrics Reporting Strategy")
|
||||
|
@ -79,7 +89,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
|
|||
props.add(PrometheusMetricsUtil.INSTANCE_ID);
|
||||
props.add(METRICS_STRATEGY);
|
||||
props.add(SEND_JVM_METRICS);
|
||||
props.add(PrometheusMetricsUtil.SSL_CONTEXT);
|
||||
props.add(SSL_CONTEXT);
|
||||
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
@ -91,7 +101,7 @@ public class PrometheusReportingTask extends AbstractReportingTask {
|
|||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ConfigurationContext context) {
|
||||
SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
|
||||
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
|
||||
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
|
||||
|
||||
try {
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.nifi.controller.ConfigurationContext;
|
|||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.ListRecordSet;
|
||||
|
@ -128,7 +128,7 @@ public class TestPrometheusRecordSink {
|
|||
|
||||
final PropertyValue pValue = mock(StandardPropertyValue.class);
|
||||
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString));
|
||||
when(context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT)).thenReturn(pValue);
|
||||
when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
|
||||
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
|
||||
|
||||
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.nifi.controller.status.PortStatus;
|
|||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.RunStatus;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
|
|
Loading…
Reference in New Issue