NIFI-13507 Removed nifi-prometheus-bundle

- Removed StandardGangliaReporter
- Removed PrometheusReportingTask
- Removed PrometheusRecordSink

This closes #9044

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joseph Witt 2024-07-04 16:01:08 -07:00 committed by exceptionfactory
parent 396c8450d0
commit 4ff70dd233
No known key found for this signature in database
40 changed files with 30 additions and 2400 deletions

View File

@ -40,6 +40,5 @@ limitations under the License.
</modules>
<properties>
<system.rules.version>1.19.0</system.rules.version>
<yammer.metrics.version>2.2.0</yammer.metrics.version>
</properties>
</project>

View File

@ -406,12 +406,6 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-network-processors-nar</artifactId>

View File

@ -870,11 +870,6 @@
<artifactId>nifi-listed-entity</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-put-pattern</artifactId>
@ -1183,11 +1178,6 @@
<artifactId>nifi-poi-services</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-reporting-task</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-persistent-provenance-repository</artifactId>

View File

@ -93,7 +93,7 @@ UI may become unavailable.
*Controller Service*: Controller Services are extension points that, after being added and configured by a DFM in the User Interface, will start up when NiFi starts up and provide information for use by other components (such as processors or other controller services). A common Controller Service used by several components is the StandardSSLContextService. It provides the ability to configure keystore and/or truststore properties once and reuse that configuration throughout the application. The idea is that, rather than configure this information in every processor that might need it, the controller service provides it for any processor to use as needed.
*Reporting Task*: Reporting Tasks run in the background to provide statistical reports about what is happening in the NiFi instance. The DFM adds and configures Reporting Tasks in the User Interface as desired. Common reporting tasks include the ControllerStatusReportingTask, MonitorDiskUsage reporting task, MonitorMemory reporting task, and the StandardGangliaReporter.
*Reporting Task*: Reporting Tasks run in the background to provide statistical reports about what is happening in the NiFi instance. The DFM adds and configures Reporting Tasks in the User Interface as desired. Common reporting tasks include the ControllerStatusReportingTask, MonitorDiskUsage reporting task, and the MonitorMemory reporting task.
*Flow Analysis Rules*: Flow Analysis Rules can analyze components or (parts of) the flow. They may produce rule violations which can help adjust or maintain optimal flow design. The DFM adds and configures Flow Analysis Rules in the User Interface as desired.

View File

@ -1,55 +0,0 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-prometheus-utils</artifactId>
<packaging>jar</packaging>
<description>
This nifi-prometheus-utils module is designed to capture common patterns
and utilities that can be leveraged by components that use Prometheus capabilities to
help promote reuse. These patterns may become framework level features
or may simply be made available through this utility. It is ok for this
module to have dependencies but care should be taken when adding dependencies
as this increases the cost of utilizing this module in various nars.
</description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-metrics</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<!-- The client -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<!-- Hotspot JVM metrics -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<!-- Exposition servlet -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_servlet</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -40,7 +40,6 @@
<module>nifi-kerberos-test-utils</module>
<module>nifi-listed-entity</module>
<module>nifi-migration-utils</module>
<module>nifi-prometheus-utils</module>
<module>nifi-put-pattern</module>
<module>nifi-record-path-property</module>
<module>nifi-record-utils</module>

View File

@ -1,38 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-prometheus-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-reporting-task</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,208 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017 nifi
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -1,76 +0,0 @@
Copyright 2015-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Dropwizard Metrics
The following NOTICE information applies:
Metrics
Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
This product includes software developed by Coda Hale and Yammer, Inc.
This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
LongAdder), which was released with the following comments:
Written by Doug Lea with assistance from members of JCP JSR-166
Expert Group and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/
The derived work in the nifi-metrics module is adapted from
https://github.com/dropwizard/metrics/blob/v2.2.0/metrics-core/src/main/java/com/yammer/metrics/core/VirtualMachineMetrics.java
and can be found in
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
(ASLv2) Prometheus Simple Client libraries
Copyright 2012-2019 The Prometheus Authors
(ASLv2) Jetty
The following NOTICE information applies:
Jetty Web Container
Copyright 1995-2017 Mort Bay Consulting Pty Ltd.
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2017 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
************
MIT
************
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/

View File

@ -1,77 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-prometheus-reporting-task</artifactId>
<description>Prometheus /metrics http endpoint for monitoring</description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,229 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@Tags({"record", "send", "write", "prometheus"})
@CapabilityDescription("Specifies a Record Sink Service that exposes data points to a Prometheus scraping service. Numeric fields are exposed as Gauges, String fields are the "
+ "label values for the gauges, and all other fields are ignored.")
public class PrometheusRecordSink extends AbstractControllerService implements RecordSinkService {
private volatile PrometheusServer prometheusServer;
private volatile RecordSchema recordSchema;
private volatile String[] labelNames;
private volatile Map<String, Gauge> gauges;
private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onScheduled(final ConfigurationContext context) {
RECORD_REGISTRY.clear();
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).evaluateAttributeExpressions().getValue();
try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
if (sslContextService == null) {
prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger());
} else {
final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
} else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> RECORD_REGISTRY;
metricsCollectors.add(nifiMetrics);
prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started Jetty server");
} catch (Exception e) {
// Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
throw new ProcessException("Failed to start Jetty server", e);
}
}
@Override
public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
WriteResult writeResult = null;
if (recordSchema == null) {
// The first time through, create the registry, then create the Gauges and register them
recordSchema = recordSet.getSchema();
RECORD_REGISTRY.clear();
// String fields are labels, collect them first
labelNames = recordSchema.getFields().stream().filter(
(f) -> isLabel(f.getDataType().getFieldType())).map(RecordField::getFieldName).toArray(String[]::new);
gauges = new HashMap<>();
recordSchema.getFields().stream().filter((field) -> isNumeric(field.getDataType().getFieldType())).forEach(
// Create, register, and add gauge to the list
(field) -> gauges.put(field.getFieldName(), Gauge.build()
.name(field.getFieldName())
.help("Metric for " + field.getFieldName())
.labelNames(labelNames)
.register(RECORD_REGISTRY))
);
}
int recordCount = 0;
Record r;
while ((r = recordSet.next()) != null) {
final Record record = r;
// Get label values, set empty strings for null values
String[] labelValues = Arrays.stream(labelNames).map((labelName) -> {
String value = record.getAsString(labelName);
return (value != null) ? value : "";
}).toArray(String[]::new);
// Get value for each gauge and update the data point
gauges.forEach((name, gauge) -> {
Optional<DataType> dataType = record.getSchema().getDataType(name);
if (dataType.isPresent()) {
RecordFieldType recordFieldType = dataType.get().getFieldType();
// Change boolean fields to doubles
final double value;
if (RecordFieldType.BOOLEAN.equals(recordFieldType)) {
value = record.getAsBoolean(name) ? 1.0 : 0.0;
} else {
value = record.getAsDouble(name);
}
gauge.labels(labelValues).set(value);
}
});
recordCount++;
}
attributes.put("record.count", Integer.toString(recordCount));
writeResult = WriteResult.of(recordCount, attributes);
return writeResult;
}
@OnDisabled
public void onStopped() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
recordSchema = null;
}
@OnShutdown
public void onShutDown() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
recordSchema = null;
}
@Override
public void reset() {
// Reset the schema in order to support different RecordSet schemas
recordSchema = null;
}
private boolean isNumeric(RecordFieldType dataType) {
// Numeric fields are metrics
return RecordFieldType.INT.equals(dataType)
|| RecordFieldType.SHORT.equals(dataType)
|| RecordFieldType.LONG.equals(dataType)
|| RecordFieldType.BIGINT.equals(dataType)
|| RecordFieldType.FLOAT.equals(dataType)
|| RecordFieldType.DOUBLE.equals(dataType)
|| RecordFieldType.DECIMAL.equals(dataType)
|| RecordFieldType.BOOLEAN.equals(dataType);
}
private boolean isLabel(RecordFieldType dataType) {
return RecordFieldType.STRING.equals(dataType)
|| RecordFieldType.CHAR.equals(dataType);
}
}

View File

@ -1,201 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.eclipse.jetty.server.Server;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating a /metrics HTTP(S) endpoint which can be used for external monitoring of the application."
+ " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance. Note that if the underlying Jetty server (i.e. the "
+ "Prometheus endpoint) cannot be started (for example if two PrometheusReportingTask instances are started on the same port), this may cause a delay in "
+ "shutting down NiFi while it waits for the server resources to be cleaned up.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec")
public class PrometheusReportingTask extends AbstractReportingTask {
private PrometheusServer prometheusServer;
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-strategy")
.displayName("Metrics Reporting Strategy")
.description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components")
.allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS)
.defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
.required(true)
.build();
public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-send-jvm")
.displayName("Send JVM metrics")
.description("Send JVM metrics in addition to the NiFi metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnScheduled
public void onScheduled(final ConfigurationContext context) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).evaluateAttributeExpressions().getValue();
try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
if (sslContextService == null) {
this.prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger());
} else {
final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
} else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> {
EventAccess eventAccess = reportingContext.getEventAccess();
ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
if (instanceId == null) {
instanceId = "";
}
String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue();
NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
CollectorRegistry collectorRegistry = PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
// Add the total byte counts (read/written) to the NiFi metrics registry
final String rootPGId = StringUtils.isEmpty(rootGroupStatus.getId()) ? "" : rootGroupStatus.getId();
final String rootPGName = StringUtils.isEmpty(rootGroupStatus.getName()) ? "" : rootGroupStatus.getName();
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesRead(), "TOTAL_BYTES_READ",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesWritten(), "TOTAL_BYTES_WRITTEN",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesSent(), "TOTAL_BYTES_SENT",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
return collectorRegistry;
};
metricsCollectors.add(nifiMetrics);
if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {
Function<ReportingContext, CollectorRegistry> jvmMetrics = (reportingContext) -> {
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
return PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
};
metricsCollectors.add(jvmMetrics);
}
this.prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started Jetty server");
} catch (Exception e) {
// Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
throw new ProcessException("Failed to start Jetty server", e);
}
}
@OnStopped
public void OnStopped() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
}
@OnShutdown
public void onShutDown() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
}
@Override
public void onTrigger(final ReportingContext context) {
if (prometheusServer != null) {
prometheusServer.setReportingContext(context);
}
}
}

View File

@ -1,159 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import javax.net.ssl.SSLContext;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public class PrometheusServer {
private static ComponentLog logger;
private final Server server;
private final ServletContextHandler handler;
private ReportingContext context;
private List<Function<ReportingContext, CollectorRegistry>> metricsCollectors;
class MetricsServlet extends HttpServlet {
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
logger.debug("PrometheusServer doGet() called");
ServletOutputStream response = resp.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(response, StandardCharsets.UTF_8);
for (Function<ReportingContext, CollectorRegistry> mc : metricsCollectors) {
CollectorRegistry collectorRegistry = mc.apply(getReportingContext());
TextFormat.write004(osw, collectorRegistry.metricFamilySamples());
}
// These must be set BEFORE osw.flush() because osw.flush() commits resp which blocks any set calls.
resp.setHeader("Content-Type", TextFormat.CONTENT_TYPE_004);
resp.setStatus(HttpURLConnection.HTTP_OK);
osw.flush();
osw.close();
response.flush();
response.close();
resp.flushBuffer();
}
}
public PrometheusServer(InetSocketAddress addr, ComponentLog logger) throws Exception {
PrometheusServer.logger = logger;
metricsCollectors = Collections.emptyList();
this.server = new Server(addr);
this.handler = new ServletContextHandler("/metrics");
this.handler.addServlet(new ServletHolder(new MetricsServlet()), "/");
this.server.setHandler(this.handler);
try {
this.server.start();
} catch (Exception e) {
// If Jetty couldn't start, stop it explicitly to avoid dangling threads
logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually", e);
this.server.stop();
throw e;
}
}
public PrometheusServer(int addr, SSLContextService sslContextService, ComponentLog logger, boolean needClientAuth, boolean wantClientAuth) throws Exception {
PrometheusServer.logger = logger;
this.server = new Server();
this.handler = new ServletContextHandler("/metrics");
this.handler.addServlet(new ServletHolder(new MetricsServlet()), "/");
this.server.setHandler(this.handler);
SslContextFactory.Server sslFactory = createSslFactory(sslContextService, needClientAuth, wantClientAuth);
HttpConfiguration httpsConfiguration = new HttpConfiguration();
httpsConfiguration.setSecureScheme("https");
httpsConfiguration.setSecurePort(addr);
httpsConfiguration.addCustomizer(new SecureRequestCustomizer());
ServerConnector https = new ServerConnector(server, new SslConnectionFactory(sslFactory, "http/1.1"),
new HttpConnectionFactory(httpsConfiguration));
https.setPort(addr);
this.server.setConnectors(new Connector[]{https});
try {
this.server.start();
} catch (Exception e) {
// If Jetty couldn't start, stop it explicitly to avoid dangling threads
logger.debug("PrometheusServer: Couldn't start Jetty server, stopping manually", e);
this.server.stop();
throw e;
}
}
private SslContextFactory.Server createSslFactory(final SSLContextService sslService, boolean needClientAuth, boolean wantClientAuth) {
final SslContextFactory.Server sslFactory = new SslContextFactory.Server();
sslFactory.setNeedClientAuth(needClientAuth);
sslFactory.setWantClientAuth(wantClientAuth);
final SSLContext sslContext = sslService.createContext();
sslFactory.setSslContext(sslContext);
return sslFactory;
}
public Server getServer() {
return this.server;
}
public ReportingContext getReportingContext() {
return this.context;
}
public void setReportingContext(ReportingContext rc) {
this.context = rc;
}
public List<Function<ReportingContext, CollectorRegistry>> getMetricsCollectors() {
return metricsCollectors;
}
public void setMetricsCollectors(List<Function<ReportingContext, CollectorRegistry>> metricsCollectors) {
this.metricsCollectors = metricsCollectors;
}
}

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.prometheus.PrometheusRecordSink

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.prometheus.PrometheusReportingTask

View File

@ -1,47 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PrometheusReportingTask</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h2>PrometheusReportingTask</h2>
<p>This ReportingTask sends the following metrics to Prometheus:</p>
<ul>
<li>FlowFilesReceivedLast5Minutes</li>
<li>BytesReceivedLast5Minutes</li>
<li>FlowFilesSentLast5Minutes</li>
<li>BytesSentLast5Minutes</li>
<li>FlowFilesQueued</li>
<li>BytesQueued</li>
<li>BytesReadLast5Minutes</li>
<li>BytesWrittenLast5Minutes</li>
<li>ActiveThreads</li>
<li>TotalTaskDurationSeconds</li>
<li>jvm.heap_used</li>
<li>jvm.heap_usage</li>
<li>jvm.non_heap_usage</li>
<li>jvm.thread_count</li>
<li>jvm.daemon_thread_count</li>
<li>jvm.file_descriptor_usage</li>
</ul>
</body>
</html>

View File

@ -1,184 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockReportingContext;
import org.apache.nifi.util.MockReportingInitializationContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class PrometheusReportingTaskIT {
private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
private static final String TEST_TASK_ID = "test-task-id";
private MockReportingInitializationContext reportingInitContextStub;
private MockReportingContext reportingContextStub;
private MockConfigurationContext configurationContextStub; // new
private PrometheusReportingTask testedReportingTask;
private ProcessGroupStatus rootGroupStatus;
@BeforeEach
public void setup() {
testedReportingTask = new PrometheusReportingTask();
rootGroupStatus = new ProcessGroupStatus();
reportingInitContextStub = new MockReportingInitializationContext(TEST_INIT_CONTEXT_ID, TEST_INIT_CONTEXT_NAME,
new MockComponentLog(TEST_TASK_ID, testedReportingTask));
reportingContextStub = new MockReportingContext(Collections.emptyMap(), new MockStateManager(testedReportingTask));
reportingContextStub.setProperty(PrometheusMetricsUtil.INSTANCE_ID.getName(), "localhost");
configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(),
reportingContextStub.getControllerServiceLookup(), null);
rootGroupStatus.setId("1234");
rootGroupStatus.setFlowFilesReceived(5);
rootGroupStatus.setBytesReceived(10000);
rootGroupStatus.setFlowFilesSent(10);
rootGroupStatus.setBytesSent(20000);
rootGroupStatus.setQueuedCount(100);
rootGroupStatus.setQueuedContentSize(1024L);
rootGroupStatus.setBytesRead(60000L);
rootGroupStatus.setBytesWritten(80000L);
rootGroupStatus.setActiveThreadCount(5);
rootGroupStatus.setName("root");
rootGroupStatus.setFlowFilesTransferred(5);
rootGroupStatus.setBytesTransferred(10000);
rootGroupStatus.setOutputContentSize(1000L);
rootGroupStatus.setInputContentSize(1000L);
rootGroupStatus.setOutputCount(100);
rootGroupStatus.setInputCount(1000);
PortStatus outputPortStatus = new PortStatus();
outputPortStatus.setId("9876");
outputPortStatus.setName("out");
outputPortStatus.setGroupId("1234");
outputPortStatus.setRunStatus(RunStatus.Stopped);
outputPortStatus.setActiveThreadCount(1);
rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
// Create a nested group status
ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
groupStatus2.setFlowFilesReceived(5);
groupStatus2.setBytesReceived(10000);
groupStatus2.setFlowFilesSent(10);
groupStatus2.setBytesSent(20000);
groupStatus2.setQueuedCount(100);
groupStatus2.setQueuedContentSize(1024L);
groupStatus2.setActiveThreadCount(2);
groupStatus2.setBytesRead(12345L);
groupStatus2.setBytesWritten(11111L);
groupStatus2.setFlowFilesTransferred(5);
groupStatus2.setBytesTransferred(10000);
groupStatus2.setOutputContentSize(1000L);
groupStatus2.setInputContentSize(1000L);
groupStatus2.setOutputCount(100);
groupStatus2.setInputCount(1000);
groupStatus2.setId("3378");
groupStatus2.setName("nestedPG");
Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
nestedGroupStatuses.add(groupStatus2);
rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
}
@Test
public void testOnTrigger() throws IOException, InitializationException {
testedReportingTask.initialize(reportingInitContextStub);
testedReportingTask.onScheduled(configurationContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
testedReportingTask.onTrigger(reportingContextStub);
String content = getMetrics();
assertTrue(content.contains(
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0"));
// Rename the component
rootGroupStatus.setName("rootroot");
content = getMetrics();
assertFalse(content.contains(
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
assertFalse(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
assertTrue(content.contains(
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
assertTrue(content.contains(
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
try {
testedReportingTask.OnStopped();
} catch (Exception e) {
// Ignore
}
}
private String getMetrics() throws IOException {
HttpURLConnection con = (HttpURLConnection) URI.create("http://localhost:9092/metrics").toURL().openConnection();
con.setRequestMethod("GET");
int status = con.getResponseCode();
assertEquals(HttpURLConnection.HTTP_OK, status);
HttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet("http://localhost:9092/metrics");
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
@Test
public void testTwoInstances() throws InitializationException {
testedReportingTask.initialize(reportingInitContextStub);
testedReportingTask.onScheduled(configurationContextStub);
PrometheusReportingTask testedReportingTask2 = new PrometheusReportingTask();
testedReportingTask2.initialize(reportingInitContextStub);
try {
testedReportingTask2.onScheduled(configurationContextStub);
fail("Should have reported Address In Use");
} catch (ProcessException pe) {
// Do nothing, this is the expected behavior
}
}
}

View File

@ -1,468 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.util.StringUtils.EMPTY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestPrometheusMetricsUtil {
private static final long DEFAULT_PREDICTION_VALUE = -1L;
private static final double EXPECTED_DEFAULT_PREDICTION_VALUE = -1.0;
private static final double EXPECTED_BACKPRESSURE_PREDICTION_VALUE = 0.0;
private static final double EXPECTED_FALSE_BACKPRESSURE = 0.0;
private static final double EXPECTED_TRUE_BACKPRESSURE = 1.0;
private static final double EXPECTED_DEFAULT_PERCENT_USED_VALUE = 0.0;
private static final double EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE = 100.0;
private static final double EXPECTED_NESTED_BYTES_PERCENT_VALUE = 150.0 / 200.0 * 100.0;
private static final double EXPECTED_NESTED_COUNT_PERCENT_VALUE = 5.0 / 30.0 * 100.0;
private static final String NIFI_PERCENT_USED_BYTES = "nifi_percent_used_bytes";
private static final String NIFI_PERCENT_USED_COUNT = "nifi_percent_used_count";
private static final String BYTES_AT_BACKPRESSURE = "bytesAtBackpressure";
private static final String COUNT_AT_BACKPRESSURE = "countAtBackpressure";
private static final String NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION = "nifi_time_to_bytes_backpressure_prediction";
private static final String NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION = "nifi_time_to_count_backpressure_prediction";
private static final String CONNECTION_1 = "Connection1";
private static final String CONNECTION_2 = "Connection2";
private static final String CONNECTION_3 = "Connection3";
private static final String CONNECTION_4 = "Connection4";
private static final String TIME_TO_BYTES_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
private static final String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
private static final String METRIC_NAME_SEGMENT_FOR_REPOSITORIES = "repo";
private static final String LABEL_NAME_FOR_REPO_IDENTIFIER = "repo_identifier";
private static final String FLOW_FILE_REPO_IDENTIFIER = "flowFileRepo";
private static final String CONTENT_REPO_IDENTIFIER_ONE = "contentRepo1";
private static final String CONTENT_REPO_IDENTIFIER_TWO = "contentRepo2";
private static final String PROVENANCE_REPO_IDENTIFIER = "provenanceRepo";
private static ProcessGroupStatus singleProcessGroupStatus;
private static ProcessGroupStatus nestedProcessGroupStatus;
private static ProcessGroupStatus singleProcessGroupStatusWithBytesBackpressure;
private static ProcessGroupStatus nestedProcessGroupStatusWithCountBackpressure;
private static Set<String> connections;
private static Map<String, Map<String, Long>> mixedValuedPredictions;
private static Map<String, Map<String, Long>> defaultValuedPredictions;
@BeforeAll
public static void setup() {
singleProcessGroupStatus = createSingleProcessGroupStatus(0, 1, 0, 1);
nestedProcessGroupStatus = createNestedProcessGroupStatus();
singleProcessGroupStatusWithBytesBackpressure = createSingleProcessGroupStatus(1, 1, 0, 1);
nestedProcessGroupStatusWithCountBackpressure = createNestedProcessGroupStatusWithCountBackpressure();
connections = createConnections();
mixedValuedPredictions = createPredictionsWithMixedValue();
defaultValuedPredictions = createPredictionsWithDefaultValuesOnly();
}
@Test
public void testAggregatePercentUsedWithSingleProcessGroup() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithSingleProcessGroupWithBytesBackpressure() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatusWithBytesBackpressure, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithNestedProcessGroups() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_NESTED_COUNT_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregatePercentUsedWithNestedProcessGroupsWithCountBackpressure() {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatusWithCountBackpressure, aggregatedMetrics);
assertEquals(4, aggregatedMetrics.size());
assertEquals(EXPECTED_NESTED_BYTES_PERCENT_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_BYTES));
assertEquals(EXPECTED_BACKPRESSURE_PERCENT_USED_VALUE, aggregatedMetrics.get(NIFI_PERCENT_USED_COUNT));
assertEquals(EXPECTED_FALSE_BACKPRESSURE, aggregatedMetrics.get(BYTES_AT_BACKPRESSURE));
assertEquals(EXPECTED_TRUE_BACKPRESSURE, aggregatedMetrics.get(COUNT_AT_BACKPRESSURE));
}
@Test
public void testAggregateConnectionPredictionsWithMixedValues() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
assertEquals(2, aggregatedMetrics.size());
assertEquals(1.0, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregateConnectionPredictionsWithAllDefaultValues() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
assertEquals(2, aggregatedMetrics.size());
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregateConnectionPredictionsWithBackpressure() {
Map<String, Double> aggregatedMetrics = new HashMap<>();
aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
assertEquals(EXPECTED_BACKPRESSURE_PREDICTION_VALUE, aggregatedMetrics.get(NIFI_TIME_TO_BYTES_BACKPRESSURE_PREDICTION));
assertEquals(2.0, aggregatedMetrics.get(NIFI_TIME_TO_COUNT_BACKPRESSURE_PREDICTION));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithAnalyticsNotSet() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
emptyAggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertTrue(emptyAggregatedMetrics.isEmpty());
assertEquals(2, sampleValues.size());
for (final Double sampleValue : sampleValues) {
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, sampleValue);
}
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithAllDefaultValues() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, defaultValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, aggregatedMetrics.size());
assertEquals(2, sampleValues.size());
for (final Double sampleValue : sampleValues) {
assertEquals(EXPECTED_DEFAULT_PREDICTION_VALUE, sampleValue);
}
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithMixedValues() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, aggregatedMetrics.size());
assertEquals(2, sampleValues.size());
assertTrue(sampleValues.containsAll(List.of(1.0, 2.0)));
}
@Test
public void testAggregatedConnectionPredictionsDatapointCreationWithBackpressure() {
ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
Map<String, Double> aggregatedMetrics = new HashMap<>();
aggregatedMetrics.put(BYTES_AT_BACKPRESSURE, 1.0);
aggregatedMetrics.put(COUNT_AT_BACKPRESSURE, 0.0);
generateConnectionAnalyticMetricsAggregation(aggregatedMetrics, mixedValuedPredictions);
PrometheusMetricsUtil.createAggregatedConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry,
aggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(connectionAnalyticsMetricsRegistry);
assertEquals(2, sampleValues.size());
assertTrue(sampleValues.containsAll(List.of(0.0, 2.0)));
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithoutResults() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> emptyAggregatedMetrics = new HashMap<>();
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
emptyAggregatedMetrics,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertTrue(emptyAggregatedMetrics.isEmpty());
assertEquals(2, sampleValues.size());
for (final Double sampleValue : sampleValues) {
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, sampleValue);
}
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithSingleProcessGroup() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> result = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(singleProcessGroupStatus, result);
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
result,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertEquals(2, sampleValues.size());
for (final Double sampleValue : sampleValues) {
assertEquals(EXPECTED_DEFAULT_PERCENT_USED_VALUE, sampleValue);
}
}
@Test
public void testAggregatedNifiMetricsDatapointCreationWithNestedProcessGroup() {
NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
Map<String, Double> result = new HashMap<>();
PrometheusMetricsUtil.aggregatePercentUsed(nestedProcessGroupStatus, result);
PrometheusMetricsUtil.createAggregatedNifiMetrics(niFiMetricsRegistry,
result,
EMPTY,
EMPTY,
EMPTY,
EMPTY);
List<Double> sampleValues = getSampleValuesList(niFiMetricsRegistry);
assertEquals(2, sampleValues.size());
assertTrue(sampleValues.containsAll(List.of(EXPECTED_NESTED_BYTES_PERCENT_VALUE, EXPECTED_NESTED_COUNT_PERCENT_VALUE)));
}
@Test
public void testStorageUsageAddedToNifiMetrics() {
final NiFiMetricsRegistry niFiMetricsRegistry = new NiFiMetricsRegistry();
final StorageUsage floeFileRepositoryUsage = createFloFileRepositoryUsage();
final Map<String, StorageUsage> contentRepositoryUsage = createContentRepositoryUsage();
final Map<String, StorageUsage> provenanceRepositoryUsage = createProvenanceRepositoryUsage();
PrometheusMetricsUtil.createStorageUsageMetrics(niFiMetricsRegistry, floeFileRepositoryUsage, contentRepositoryUsage, provenanceRepositoryUsage,
EMPTY, EMPTY, EMPTY, EMPTY, EMPTY);
final Set<String> result = getRepoIdentifierSampleLabelNames(niFiMetricsRegistry);
assertEquals(4, result.size());
assertTrue(result.containsAll(List.of(FLOW_FILE_REPO_IDENTIFIER, CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO, PROVENANCE_REPO_IDENTIFIER)));
}
private static ProcessGroupStatus createSingleProcessGroupStatus(final long queuedBytes, final long bytesThreshold, final int queuedCount, final long objectThreshold) {
ProcessGroupStatus singleStatus = new ProcessGroupStatus();
List<ConnectionStatus> connectionStatuses = new ArrayList<>();
ConnectionStatus connectionStatus = new ConnectionStatus();
connectionStatus.setQueuedBytes(queuedBytes);
connectionStatus.setBackPressureBytesThreshold(bytesThreshold);
connectionStatus.setQueuedCount(queuedCount);
connectionStatus.setBackPressureObjectThreshold(objectThreshold);
connectionStatuses.add(connectionStatus);
singleStatus.setConnectionStatus(connectionStatuses);
return singleStatus;
}
private static ProcessGroupStatus createNestedProcessGroupStatus() {
ProcessGroupStatus rootStatus = new ProcessGroupStatus();
ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 10, 200);
ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30);
status1.setProcessGroupStatus(Collections.singletonList(status2));
rootStatus.setProcessGroupStatus(Collections.singletonList(status1));
return rootStatus;
}
private static ProcessGroupStatus createNestedProcessGroupStatusWithCountBackpressure() {
ProcessGroupStatus rootStatus = new ProcessGroupStatus();
ProcessGroupStatus status1 = createSingleProcessGroupStatus(15, 100, 1, 1);
ProcessGroupStatus status2 = createSingleProcessGroupStatus(150, 200, 5, 30);
status1.setProcessGroupStatus(Collections.singletonList(status2));
rootStatus.setProcessGroupStatus(Collections.singletonList(status1));
return rootStatus;
}
private static Map<String, Map<String, Long>> createPredictionsWithMixedValue() {
Map<String, Map<String, Long>> predictions = new HashMap<>();
predictions.put(CONNECTION_1, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, Long.MAX_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, Long.MAX_VALUE);
}});
predictions.put(CONNECTION_2, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
}});
predictions.put(CONNECTION_3, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 1L);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 4L);
}});
predictions.put(CONNECTION_4, new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, 3L);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, 2L);
}});
return predictions;
}
private static Map<String, Map<String, Long>> createPredictionsWithDefaultValuesOnly() {
Map<String, Map<String, Long>> predictions = new HashMap<>();
Map<String, Long> defaultPredictions = new HashMap<String, Long>() {{
put(TIME_TO_BYTES_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, DEFAULT_PREDICTION_VALUE);
}};
predictions.put(CONNECTION_1, defaultPredictions);
predictions.put(CONNECTION_2, defaultPredictions);
predictions.put(CONNECTION_3, defaultPredictions);
predictions.put(CONNECTION_4, defaultPredictions);
return predictions;
}
private static Set<String> createConnections() {
Set<String> connections = new HashSet<>();
connections.add(CONNECTION_1);
connections.add(CONNECTION_2);
connections.add(CONNECTION_3);
connections.add(CONNECTION_4);
return connections;
}
private Map<String, Long> getPredictions(final Map<String, Map<String, Long>> predictions, final String connection) {
return predictions.get(connection);
}
private List<Double> getSampleValuesList(final AbstractMetricsRegistry metricsRegistry) {
return Collections.list(metricsRegistry.getRegistry().metricFamilySamples())
.stream()
.flatMap(familySamples -> familySamples.samples.stream())
.map(sample -> sample.value)
.collect(Collectors.toList());
}
private void generateConnectionAnalyticMetricsAggregation(final Map<String, Double> aggregatedMetrics, final Map<String, Map<String, Long>> predictions) {
for (final String connection : connections) {
PrometheusMetricsUtil.aggregateConnectionPredictionMetrics(aggregatedMetrics, getPredictions(predictions, connection));
}
}
private StorageUsage createFloFileRepositoryUsage() {
return createStorageUsage(FLOW_FILE_REPO_IDENTIFIER);
}
private Map<String, StorageUsage> createContentRepositoryUsage() {
return createStorageUsages(CONTENT_REPO_IDENTIFIER_ONE, CONTENT_REPO_IDENTIFIER_TWO);
}
private Map<String, StorageUsage> createProvenanceRepositoryUsage() {
return createStorageUsages(PROVENANCE_REPO_IDENTIFIER);
}
private StorageUsage createStorageUsage(final String repoIdentifier) {
final StorageUsage storageUsage = new StorageUsage();
storageUsage.setFreeSpace(1L);
storageUsage.setTotalSpace(2L);
storageUsage.setIdentifier(repoIdentifier);
return storageUsage;
}
private Map<String, StorageUsage> createStorageUsages(final String... repoIdentifier) {
final Map<String, StorageUsage> storageUsageMap = new HashMap<>();
for (final String repoName : repoIdentifier) {
storageUsageMap.put(repoName, createStorageUsage(repoName));
}
return storageUsageMap;
}
private Set<String> getRepoIdentifierSampleLabelNames(final AbstractMetricsRegistry metricsRegistry) {
return Collections.list(metricsRegistry.getRegistry().filteredMetricFamilySamples(e -> e.contains(METRIC_NAME_SEGMENT_FOR_REPOSITORIES)))
.stream().flatMap(f -> f.samples.stream())
.map(s -> s.labelValues.get(s.labelNames.indexOf(LABEL_NAME_FOR_REPO_IDENTIFIER)))
.collect(Collectors.toSet());
}
}

View File

@ -1,158 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPrometheusRecordSink {
private static final String portString = "7077";
@Test
public void testSendData() throws IOException, InitializationException {
PrometheusRecordSink sink = initTask();
List<RecordField> recordFields = Arrays.asList(
new RecordField("field1", RecordFieldType.INT.getDataType()),
new RecordField("field2", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)),
new RecordField("field3", RecordFieldType.STRING.getDataType())
);
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
Map<String, Object> row1 = new LinkedHashMap<>();
row1.put("field1", 15);
row1.put("field2", BigDecimal.valueOf(12.34567D));
row1.put("field3", "Hello");
Map<String, Object> row2 = new LinkedHashMap<>();
row2.put("field1", 6);
row2.put("field2", BigDecimal.valueOf(0.1234567890123456789D));
row2.put("field3", "WorldÄËÖÜ!");
RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
new MapRecord(recordSchema, row1),
new MapRecord(recordSchema, row2)
));
Map<String, String> attributes = new LinkedHashMap<>();
attributes.put("a", "Hello");
WriteResult writeResult = sink.sendData(recordSet, attributes, true);
assertNotNull(writeResult);
assertEquals(2, writeResult.getRecordCount());
assertEquals("Hello", writeResult.getAttributes().get("a"));
final String content = getMetrics();
assertTrue(content.contains("field1{field3=\"Hello\",} 15.0\nfield1{field3=\"WorldÄËÖÜ!\",} 6.0\n")
|| content.contains("field1{field3=\"WorldÄËÖÜ!\",} 6.0\nfield1{field3=\"Hello\",} 15.0\n"));
assertTrue(content.contains("field2{field3=\"Hello\",} 12.34567\nfield2{field3=\"WorldÄËÖÜ!\",} 0.12345678901234568\n")
|| content.contains("field2{field3=\"WorldÄËÖÜ!\",} 0.12345678901234568\nfield2{field3=\"Hello\",} 12.34567\n"));
try {
sink.onStopped();
} catch (Exception e) {
// Do nothing, just need to shut down the server before the next run
}
}
@Test
public void testTwoInstances() throws Exception {
PrometheusRecordSink sink1 = initTask();
assertThrows(ProcessException.class, this::initTask);
sink1.onStopped();
}
private String getMetrics() throws IOException {
HttpURLConnection con = (HttpURLConnection) URI.create("http://localhost:" + portString + "/metrics").toURL().openConnection();
con.setRequestMethod("GET");
int status = con.getResponseCode();
assertEquals(HttpURLConnection.HTTP_OK, status);
HttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet("http://localhost:" + portString + "/metrics");
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
private PrometheusRecordSink initTask() throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
final PrometheusRecordSink task = new PrometheusRecordSink();
ConfigurationContext context = mock(ConfigurationContext.class);
final StateManager stateManager = new MockStateManager(task);
final Map<String, String> variableRegistry = new HashMap<String, String>();
final PropertyValue pValue = mock(StandardPropertyValue.class);
variableRegistry.put("port", portString);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue("${port}", null, variableRegistry));
when(context.getProperty(PrometheusRecordSink.SSL_CONTEXT)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
task.initialize(initContext);
task.onScheduled(context);
return task;
}
}

View File

@ -1,31 +0,0 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-bom</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../nifi-standard-services-api-bom</relativePath>
</parent>
<artifactId>nifi-prometheus-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-prometheus-reporting-task</module>
<module>nifi-prometheus-nar</module>
</modules>
</project>

View File

@ -27,14 +27,5 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${yammer.metrics.version}</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,261 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.ganglia;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.GangliaReporter;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Configuration of this reporting task requires a "host" property that points
* to the Ganglia server and optionally allows a "port" property (default
* otherwise is 8649)
*/
@Tags({"ganglia", "stats"})
@CapabilityDescription("Reports metrics to Ganglia so that Ganglia can be used for external monitoring of the application. Metrics"
+ " reported include JVM Metrics (optional); the following 5-minute NiFi statistics: FlowFiles Received, Bytes Received,"
+ " FlowFiles Sent, Bytes Sent, Bytes Read, Bytes Written, Total Task Duration; and the current values for"
+ " FlowFiles Queued, Bytes Queued, and number of Active Threads.")
public class StandardGangliaReporter extends AbstractReportingTask {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully-qualified name of the host on which Ganglia is running")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The Port on which Ganglia is listening for incoming connections")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("8649")
.build();
public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("Send JVM Metrics")
.description("Specifies whether or not JVM Metrics should be gathered and sent, in addition to NiFi-specific metrics")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final String METRICS_GROUP = "NiFi";
private MetricsRegistry metricsRegistry;
private GangliaReporter gangliaReporter;
private final AtomicReference<ProcessGroupStatus> latestStatus = new AtomicReference<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HOSTNAME);
properties.add(PORT);
properties.add(SEND_JVM_METRICS);
return properties;
}
@OnScheduled
public void onConfigure(final ConfigurationContext config) throws InitializationException {
metricsRegistry = new MetricsRegistry();
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int32", "FlowFiles Received Last 5 mins"), new Gauge<Integer>() {
@Override
public Integer value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0;
}
final Integer value = status.getFlowFilesReceived();
return (value == null) ? 0 : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int64", "Bytes Received Last 5 mins"), new Gauge<Long>() {
@Override
public Long value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0L;
}
return status.getBytesReceived();
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int32", "FlowFiles Sent Last 5 mins"), new Gauge<Integer>() {
@Override
public Integer value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0;
}
return status.getFlowFilesSent();
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int64", "Bytes Sent Last 5 mins"), new Gauge<Long>() {
@Override
public Long value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0L;
}
return status.getBytesSent();
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int32", "FlowFiles Queued"), new Gauge<Integer>() {
@Override
public Integer value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0;
}
final Integer value = status.getQueuedCount();
return (value == null) ? 0 : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int64", "Bytes Queued"), new Gauge<Long>() {
@Override
public Long value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0L;
}
final Long value = status.getQueuedContentSize();
return (value == null) ? 0L : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int64", "Bytes Read (5 mins)"), new Gauge<Long>() {
@Override
public Long value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0L;
}
final Long value = status.getBytesRead();
return (value == null) ? 0L : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int64", "Bytes Written (5 mins)"), new Gauge<Long>() {
@Override
public Long value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0L;
}
final Long value = status.getBytesWritten();
return (value == null) ? 0L : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int32", "Active Threads"), new Gauge<Integer>() {
@Override
public Integer value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0;
}
final Integer value = status.getActiveThreadCount();
return (value == null) ? 0 : value;
}
});
metricsRegistry.newGauge(new MetricName(METRICS_GROUP, "int32", "Total Task Duration Seconds"), new Gauge<Integer>() {
@Override
public Integer value() {
final ProcessGroupStatus status = latestStatus.get();
if (status == null) {
return 0;
}
final long nanos = calculateProcessingNanos(status);
return (int) TimeUnit.NANOSECONDS.toSeconds(nanos);
}
});
final String gangliaHost = config.getProperty(HOSTNAME).getValue();
final int port = config.getProperty(PORT).asInteger();
try {
gangliaReporter = new GangliaReporter(metricsRegistry, gangliaHost, port, METRICS_GROUP) {
@Override
protected String sanitizeName(MetricName name) {
return name.getName();
}
};
gangliaReporter.printVMMetrics = config.getProperty(SEND_JVM_METRICS).asBoolean();
} catch (final IOException e) {
throw new InitializationException(e);
}
}
@Override
public void onTrigger(final ReportingContext context) {
final ProcessGroupStatus rootGroupStatus = context.getEventAccess().getControllerStatus();
this.latestStatus.set(rootGroupStatus);
gangliaReporter.run();
getLogger().info("{} Sent metrics to Ganglia", this);
}
private long calculateProcessingNanos(final ProcessGroupStatus status) {
long nanos = 0L;
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
nanos += procStats.getProcessingNanos();
}
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
nanos += calculateProcessingNanos(childGroupStatus);
}
return nanos;
}
}

View File

@ -15,4 +15,3 @@
org.apache.nifi.controller.ControllerStatusReportingTask
org.apache.nifi.controller.MonitorDiskUsage
org.apache.nifi.controller.MonitorMemory
org.apache.nifi.reporting.ganglia.StandardGangliaReporter

View File

@ -1,41 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>StandardGangliaReporter</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Description:</h2>
<p>Reporting Task that reports metrics to a Ganglia server. The following metrics are reported:
</p>
<ul>
<li><strong>FlowFiles In (5 mins)</strong>: The number of FlowFiles received via Site-to-Site in the last 5 minutes</li>
<li><strong>Bytes In (5 mins)</strong>: The number of bytes received via Site-to-Site in the last 5 minutes</li>
<li><strong>FlowFiles Out (5 mins)</strong>: The number of FlowFiles pulled from Output Ports via Site-to-Site in the last 5 minutes</li>
<li><strong>Bytes Out (5 mins)</strong>: The number of bytes pulled from Output Ports via Site-to-Site in the last 5 minutes</li>
<li><strong>Bytes Read (5 mins)</strong>: The number of bytes read from disk by NiFi in the last 5 minutes</li>
<li><strong>Bytes Written (5 mins)</strong>: The number of bytes written to disk by NiFi in the last 5 minutes</li>
<li><strong>FlowFiles Queued</strong>: The total number of FlowFiles currently queued on the system at the point in time at which the Reporting Task is run</li>
<li><strong>Bytes Queued</strong>: The total number of bytes allocated by the FlowFiles that are currently queued on the system at the point in time at which the Reporting Task is run</li>
<li><strong>Active Threads</strong>: The number of threads actively running at the point in time at which the Reporting Task is run</li>
</ul>
</body>
</html>

View File

@ -33,7 +33,6 @@
<module>nifi-standard-nar</module>
</modules>
<properties>
<yammer.metrics.version>2.2.0</yammer.metrics.version>
<org.apache.sshd.version>2.13.1</org.apache.sshd.version>
<tika.version>2.9.2</tika.version>
</properties>
@ -70,11 +69,6 @@
<type>war</type>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
<version>${yammer.metrics.version}</version>
</dependency>
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>sshj</artifactId>

View File

@ -72,7 +72,6 @@
<module>nifi-extension-utils</module>
<module>nifi-redis-bundle</module>
<module>nifi-network-bundle</module>
<module>nifi-prometheus-bundle</module>
<module>nifi-sql-reporting-bundle</module>
<module>nifi-hazelcast-bundle</module>
<module>nifi-asn1-bundle</module>

View File

@ -458,11 +458,6 @@
<artifactId>nifi-authorizer</artifactId>
<scope>provided</scope> <!-- expected to be provided by parent classloader -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.Gauge;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.Gauge;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.Gauge;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.Gauge;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.Gauge;

View File

@ -15,11 +15,10 @@
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
package org.apache.nifi.prometheusutil;
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -28,10 +27,8 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.Map;
@ -39,59 +36,15 @@ import java.util.concurrent.TimeUnit;
public class PrometheusMetricsUtil {
public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
"Send rollup metrics for the entire root process group");
public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
"Send metrics for each process group");
public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
"Send metrics for each component in the system, to include processors, connections, controller services, etc.");
private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
protected static final String DEFAULT_LABEL_STRING = "";
private static final double MAXIMUM_BACKPRESSURE = 1.0;
private static final double UNDEFINED_BACKPRESSURE = -1.0;
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication",
"ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication",
"ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-endpoint-port")
.displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("9092")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-instance-id")
.displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetricsRegistry, ProcessGroupStatus status,
String instId, String parentProcessGroupId, String compType, String metricsStrategy) {

View File

@ -136,13 +136,13 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.prometheusutil.AbstractMetricsRegistry;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
import org.apache.nifi.prometheusutil.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheusutil.JvmMetricsRegistry;
import org.apache.nifi.prometheusutil.NiFiMetricsRegistry;
import org.apache.nifi.prometheusutil.PrometheusMetricsUtil;
import org.apache.nifi.registry.flow.FlowLocation;
import org.apache.nifi.registry.flow.FlowRegistryBranch;
import org.apache.nifi.registry.flow.FlowRegistryBucket;

View File

@ -16,12 +16,12 @@
*/
package org.apache.nifi.web.api.request;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheusutil.AbstractMetricsRegistry;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
import org.apache.nifi.prometheusutil.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheusutil.JvmMetricsRegistry;
import org.apache.nifi.prometheusutil.NiFiMetricsRegistry;
/**
* Flow Metrics Registries

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.web.util;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheusutil.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.web.controller.ControllerFacade;
import java.util.Collection;

View File

@ -19,8 +19,8 @@ package org.apache.nifi.web.util;
import jakarta.ws.rs.WebApplicationException;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.prometheusutil.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheusutil.PrometheusMetricsUtil;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.controller.ControllerFacade;

View File

@ -27,12 +27,12 @@ import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
import org.apache.nifi.prometheusutil.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheusutil.JvmMetricsRegistry;
import org.apache.nifi.prometheusutil.NiFiMetricsRegistry;
import org.apache.nifi.prometheusutil.PrometheusMetricsUtil;
import org.apache.nifi.registry.flow.FlowVersionLocation;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;

View File

@ -109,25 +109,6 @@ class TestRuntimeManifest {
assertEquals(1, lineStartPatternDependency.getDependentValues().size());
assertEquals("Single file", lineStartPatternDependency.getDependentValues().get(0));
// Verify PrometheusReportingTask definition which also has @DefaultSchedule
final ReportingTaskDefinition prometheusReportingTaskDef = getReportingTaskDefinition(bundles, "nifi-prometheus-nar",
"org.apache.nifi.reporting.prometheus.PrometheusReportingTask");
assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), prometheusReportingTaskDef.getDefaultSchedulingStrategy());
final List<String> prometheusSchedulingStrategies = prometheusReportingTaskDef.getSupportedSchedulingStrategies();
assertNotNull(prometheusSchedulingStrategies);
assertEquals(2, prometheusSchedulingStrategies.size());
assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));
final Map<String, String> prometheusDefaultSchedulingPeriods = prometheusReportingTaskDef.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(prometheusDefaultSchedulingPeriods);
assertEquals(2, prometheusDefaultSchedulingPeriods.size());
// TIMER_DRIVEN period should come from the @DefaultSchedule annotation that overrides the default value
assertEquals(REPORTING_TASK_DEFAULT_SCHEDULE_TIME, prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
final ProcessorDefinition joltTransformDef = getProcessorDefinition(bundles, "nifi-jolt-nar",
"org.apache.nifi.processors.jolt.JoltTransformRecord");
assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), joltTransformDef.getDefaultSchedulingStrategy());