NIFI-2269 Added datadog support

This closes #655

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Ramiz 2016-08-16 13:03:04 +03:00 committed by jpercivall
parent dca3764ed1
commit 376d3c4ef4
21 changed files with 1872 additions and 1 deletions

View File

@ -1487,3 +1487,33 @@ This project bundles 'Jython' which is available under a Python Software Foundat
OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
This project bundles 'metrics-datadog' which is available
under a BSD license.
Copyright (c) 2014, Vistar Media
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of Vistar Media nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -59,6 +59,14 @@ The following binary components are provided under the Apache Software License v
Tatu Saloranta (http://wiki.fasterxml.com/TatuSaloranta)
(ASLv2) Google Guava
The following NOTICE information applies:
Copyright 2011 Guava International Limited
(ASLv2) Dropwizard Metrics
The following NOTICE information applies:
Copyright (c) 2010-2013 Coda Hale, Yammer.com
(ASLv2) Jasypt
The following NOTICE information applies:
Copyright (c) 2007-2010, The JASYPT team (http://www.jasypt.org)

147
nifi-assembly/pom.xml Normal file → Executable file
View File

@ -83,6 +83,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -131,6 +132,12 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-distributed-cache-services-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-datadog-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-nar</artifactId>
@ -367,7 +374,147 @@ language governing permissions and limitations under the License. -->
<type>nar</type>
</dependency>
</dependencies>
<properties>
<!--Wrapper Properties -->
<nifi.jvm.heap.mb>512</nifi.jvm.heap.mb>
<nifi.jvm.permgen.mb>128</nifi.jvm.permgen.mb>
<nifi.run.as />
<!-- nifi.properties: core properties -->
<nifi.version>${project.version}</nifi.version>
<nifi.flowcontroller.autoResumeState>true</nifi.flowcontroller.autoResumeState>
<nifi.flowcontroller.graceful.shutdown.period>10 sec</nifi.flowcontroller.graceful.shutdown.period>
<nifi.flowservice.writedelay.interval>500 ms</nifi.flowservice.writedelay.interval>
<nifi.administrative.yield.duration>30 sec</nifi.administrative.yield.duration>
<nifi.bored.yield.duration>10 millis</nifi.bored.yield.duration>
<nifi.flow.configuration.file>./conf/flow.xml.gz</nifi.flow.configuration.file>
<nifi.flow.configuration.archive.enabled>true</nifi.flow.configuration.archive.enabled>
<nifi.flow.configuration.archive.dir>./conf/archive/</nifi.flow.configuration.archive.dir>
<nifi.flow.configuration.archive.max.time>30 days</nifi.flow.configuration.archive.max.time>
<nifi.flow.configuration.archive.max.storage>500 MB</nifi.flow.configuration.archive.max.storage>
<nifi.login.identity.provider.configuration.file>./conf/login-identity-providers.xml</nifi.login.identity.provider.configuration.file>
<nifi.authorizer.configuration.file>./conf/authorizers.xml</nifi.authorizer.configuration.file>
<nifi.templates.directory>./conf/templates</nifi.templates.directory>
<nifi.database.directory>./database_repository</nifi.database.directory>
<nifi.state.management.configuration.file>./conf/state-management.xml</nifi.state.management.configuration.file>
<nifi.state.management.embedded.zookeeper.start>false</nifi.state.management.embedded.zookeeper.start>
<nifi.state.management.embedded.zookeeper.properties>./conf/zookeeper.properties</nifi.state.management.embedded.zookeeper.properties>
<nifi.state.management.provider.local>local-provider</nifi.state.management.provider.local>
<nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster>
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
<nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory>
<nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>
<nifi.flowfile.repository.checkpoint.interval>2 mins</nifi.flowfile.repository.checkpoint.interval>
<nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync>
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
<nifi.swap.in.period>5 sec</nifi.swap.in.period>
<nifi.swap.in.threads>1</nifi.swap.in.threads>
<nifi.swap.out.period>5 sec</nifi.swap.out.period>
<nifi.swap.out.threads>4</nifi.swap.out.threads>
<nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
<nifi.content.claim.max.appendable.size>10 MB</nifi.content.claim.max.appendable.size>
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
<nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
<nifi.content.repository.archive.max.retention.period>12 hours</nifi.content.repository.archive.max.retention.period>
<nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
<nifi.content.repository.archive.enabled>true</nifi.content.repository.archive.enabled>
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
<nifi.content.viewer.url>/nifi-content-viewer/</nifi.content.viewer.url>
<nifi.restore.directory />
<nifi.ui.banner.text />
<nifi.ui.autorefresh.interval>30 sec</nifi.ui.autorefresh.interval>
<nifi.nar.library.directory>./lib</nifi.nar.library.directory>
<nifi.nar.working.directory>./work/nar/</nifi.nar.working.directory>
<nifi.documentation.working.directory>./work/docs/components</nifi.documentation.working.directory>
<nifi.sensitive.props.algorithm>PBEWITHMD5AND256BITAES-CBC-OPENSSL</nifi.sensitive.props.algorithm>
<nifi.sensitive.props.provider>BC</nifi.sensitive.props.provider>
<nifi.h2.url.append>;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE</nifi.h2.url.append>
<nifi.remote.input.socket.port>9990</nifi.remote.input.socket.port>
<!-- persistent provenance repository properties -->
<nifi.provenance.repository.implementation>org.apache.nifi.provenance.PersistentProvenanceRepository</nifi.provenance.repository.implementation>
<nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default>
<nifi.provenance.repository.max.storage.time>24 hours</nifi.provenance.repository.max.storage.time>
<nifi.provenance.repository.max.storage.size>1 GB</nifi.provenance.repository.max.storage.size>
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
<nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.attributes />
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
<nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length>
<!-- volatile provenance repository properties -->
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>
<!-- Component status repository properties -->
<nifi.components.status.repository.implementation>org.apache.nifi.controller.status.history.VolatileComponentStatusRepository</nifi.components.status.repository.implementation>
<nifi.components.status.repository.buffer.size>1440</nifi.components.status.repository.buffer.size>
<nifi.components.status.snapshot.frequency>1 min</nifi.components.status.snapshot.frequency>
<!-- nifi.properties: web properties -->
<nifi.web.war.directory>./lib</nifi.web.war.directory>
<nifi.web.http.host />
<nifi.web.http.port>8080</nifi.web.http.port>
<nifi.web.https.host />
<nifi.web.https.port />
<nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>
<nifi.web.jetty.threads>200</nifi.web.jetty.threads>
<!-- nifi.properties: security properties -->
<nifi.security.keystore />
<nifi.security.keystoreType />
<nifi.security.keystorePasswd />
<nifi.security.keyPasswd />
<nifi.security.truststore />
<nifi.security.truststoreType />
<nifi.security.truststorePasswd />
<nifi.security.needClientAuth />
<nifi.security.user.authorizer>file-provider</nifi.security.user.authorizer>
<nifi.security.user.login.identity.provider />
<nifi.security.x509.principal.extractor />
<nifi.security.ocsp.responder.url />
<nifi.security.ocsp.responder.certificate />
<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
<!-- nifi.properties: cluster node properties (only configure for cluster nodes) -->
<nifi.cluster.is.node>false</nifi.cluster.is.node>
<nifi.cluster.node.address />
<nifi.cluster.node.protocol.port />
<nifi.cluster.node.protocol.threads>10</nifi.cluster.node.protocol.threads>
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
<nifi.cluster.firewall.file />
<nifi.cluster.request.replication.claim.timeout>15 secs</nifi.cluster.request.replication.claim.timeout>
<!-- nifi.properties: zookeeper properties -->
<nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
<nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
<nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
<nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
<!-- nifi.properties: kerberos properties -->
<nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
<nifi.kerberos.service.principal />
<nifi.kerberos.keytab.location />
<nifi.kerberos.authentication.expiration>12 hours</nifi.kerberos.authentication.expiration>
</properties>
<profiles>
<profile>
<id>rpm</id>

View File

@ -137,7 +137,6 @@ public class AmbariReportingTask extends AbstractReportingTask {
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
}
}
// calculate the current metrics, but store them to be sent next time

View File

@ -0,0 +1,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-datadog-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-datadog-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-datadog-reporting-task</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,239 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
This project bundles 'metrics-datadog' which is available
under a BSD license.
Copyright (c) 2014, Vistar Media
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of Vistar Media nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,46 @@
nifi-ambari-nar
Copyright 2015-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Yammer Metrics
The following NOTICE information applies:
Metrics
Copyright 2010-2012 Coda Hale and Yammer, Inc.
This product includes software developed by Coda Hale and Yammer, Inc.
This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
with the following comments:
Written by Doug Lea with assistance from members of JCP JSR-166
Expert Group and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
(CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
(CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
(CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
(CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
(CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
(CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
(CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
(CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)

View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-datadog-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-datadog-reporting-task</artifactId>
<description>Publishes NiFi metrics to datadog</description>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.coursera</groupId>
<artifactId>metrics-datadog</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog;
import com.codahale.metrics.MetricRegistry;
import org.coursera.metrics.datadog.DatadogReporter;
import org.coursera.metrics.datadog.transport.HttpTransport;
import org.coursera.metrics.datadog.transport.Transport;
import org.coursera.metrics.datadog.transport.UdpTransport;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
/**
* Class configures MetricRegistry (passed outside or created from scratch) with Datadog support
*/
public class DDMetricRegistryBuilder {
private MetricRegistry metricRegistry = null;
private List<String> tags = Arrays.asList();
private DatadogReporter datadogReporter;
private String apiKey = "";
private Transport transport;
public DDMetricRegistryBuilder setMetricRegistry(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
return this;
}
public DDMetricRegistryBuilder setTags(List<String> tags) {
this.tags = tags;
return this;
}
public DatadogReporter getDatadogReporter() {
return datadogReporter;
}
public MetricRegistry build(String apiKey) throws IOException {
if (metricRegistry == null)
metricRegistry = new MetricRegistry();
if (createTransport(apiKey)) {
datadogReporter = createDatadogReporter(this.metricRegistry);
}
return this.metricRegistry;
}
private boolean createTransport(String apiKey) {
//if api key was not changed
if (this.apiKey.equals(apiKey)) {
return false;
} else if (apiKey.equals("agent")) {
this.apiKey = "agent";
transport = new UdpTransport.Builder().build();
return true;
} else {
this.apiKey = apiKey;
transport = new HttpTransport.Builder().withApiKey(apiKey).build();
return true;
}
}
/*
* create DataDog reporter
*/
private DatadogReporter createDatadogReporter(MetricRegistry metricRegistry) throws IOException {
DatadogReporter reporter =
DatadogReporter.forRegistry(metricRegistry)
.withHost(InetAddress.getLocalHost().getHostName())
.withTransport(transport)
.withTags(tags)
.build();
return reporter;
}
}

View File

@ -0,0 +1,281 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.datadog.metrics.MetricsService;
import org.coursera.metrics.datadog.DynamicTagsCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Tags({"reporting", "datadog", "metrics"})
@CapabilityDescription("Publishes metrics from NiFi to datadog")
public class DataDogReportingTask extends AbstractReportingTask {
static final AllowableValue DATADOG_AGENT = new AllowableValue("DataDog Agent", "DataDog Agent",
"Metrics will be sent via locally installed DataDog agent. " +
"DataDog agent needs to be installed manually before using this option");
static final AllowableValue DATADOG_HTTP = new AllowableValue("Datadog HTTP", "Datadog HTTP",
"Metrics will be sent via HTTP transport with no need of Agent installed. " +
"DataDog API key needs to be set");
static final PropertyDescriptor DATADOG_TRANSPORT = new PropertyDescriptor.Builder()
.name("DataDog transport")
.description("Transport through which metrics will be sent to DataDog")
.required(true)
.allowableValues(DATADOG_AGENT, DATADOG_HTTP)
.defaultValue(DATADOG_HTTP.getValue())
.build();
static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
.name("API key")
.description("DataDog API key. If specified value is 'agent', local DataDog agent will be used.")
.expressionLanguageSupported(false)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor METRICS_PREFIX = new PropertyDescriptor.Builder()
.name("Metrics prefix")
.description("Prefix to be added before every metric")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ENVIRONMENT = new PropertyDescriptor.Builder()
.name("Environment")
.description("Environment, dataflow is running in. " +
"This property will be included as metrics tag.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("dev")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private MetricsService metricsService;
private DDMetricRegistryBuilder ddMetricRegistryBuilder;
private MetricRegistry metricRegistry;
private String metricsPrefix;
private String environment;
private String statusId;
private ConcurrentHashMap<String, AtomicDouble> metricsMap;
private Map<String, String> defaultTags;
private volatile VirtualMachineMetrics virtualMachineMetrics;
private Logger logger = LoggerFactory.getLogger(getClass().getName());
@OnScheduled
public void setup(final ConfigurationContext context) {
metricsService = getMetricsService();
ddMetricRegistryBuilder = getMetricRegistryBuilder();
metricRegistry = getMetricRegistry();
metricsMap = getMetricsMap();
metricsPrefix = METRICS_PREFIX.getDefaultValue();
environment = ENVIRONMENT.getDefaultValue();
virtualMachineMetrics = VirtualMachineMetrics.getInstance();
ddMetricRegistryBuilder.setMetricRegistry(metricRegistry)
.setTags(metricsService.getAllTagsList());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(METRICS_PREFIX);
properties.add(ENVIRONMENT);
properties.add(API_KEY);
properties.add(DATADOG_TRANSPORT);
return properties;
}
@Override
public void onTrigger(ReportingContext context) {
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
metricsPrefix = context.getProperty(METRICS_PREFIX).evaluateAttributeExpressions().getValue();
environment = context.getProperty(ENVIRONMENT).evaluateAttributeExpressions().getValue();
statusId = status.getId();
defaultTags = ImmutableMap.of("env", environment, "dataflow_id", statusId);
try {
updateDataDogTransport(context);
} catch (IOException e) {
e.printStackTrace();
}
updateAllMetricGroups(status);
ddMetricRegistryBuilder.getDatadogReporter().report();
}
protected void updateMetrics(Map<String, Double> metrics, Optional<String> processorName, Map<String, String> tags) {
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
logger.info(entry.getKey() + ": " + entry.getValue());
final String metricName = buildMetricName(processorName, entry.getKey());
//if metric is not registered yet - register it
if (!metricsMap.containsKey(metricName)) {
metricsMap.put(metricName, new AtomicDouble(entry.getValue()));
metricRegistry.register(metricName, new MetricGauge(metricName, tags));
}
//set real time value to metrics map
metricsMap.get(metricName).set(entry.getValue());
}
}
private void updateAllMetricGroups (ProcessGroupStatus processGroupStatus) {
final List<ProcessorStatus> processorStatuses = new ArrayList<>();
populateProcessorStatuses(processGroupStatus, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
updateMetrics(metricsService.getProcessorMetrics(processorStatus),
Optional.of(processorStatus.getName()), defaultTags);
}
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(processGroupStatus, connectionStatuses);
for (ConnectionStatus connectionStatus: connectionStatuses) {
Map<String, String> connectionStatusTags = new HashMap<>(defaultTags);
connectionStatusTags.putAll(metricsService.getConnectionStatusTags(connectionStatus));
updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus), Optional.<String>absent(), connectionStatusTags);
}
final List<PortStatus> inputPortStatuses = new ArrayList<>();
populateInputPortStatuses(processGroupStatus, inputPortStatuses);
for (PortStatus portStatus: inputPortStatuses) {
Map<String, String> portTags = new HashMap<>(defaultTags);
portTags.putAll(metricsService.getPortStatusTags(portStatus));
updateMetrics(metricsService.getPortStatusMetrics(portStatus), Optional.<String>absent(), portTags);
}
final List<PortStatus> outputPortStatuses = new ArrayList<>();
populateOutputPortStatuses(processGroupStatus, outputPortStatuses);
for (PortStatus portStatus: outputPortStatuses) {
Map<String, String> portTags = new HashMap<>(defaultTags);
portTags.putAll(metricsService.getPortStatusTags(portStatus));
updateMetrics(metricsService.getPortStatusMetrics(portStatus), Optional.<String>absent(), portTags);
}
updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics),
Optional.<String>absent(), defaultTags);
updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus), Optional.<String>absent(), defaultTags);
}
private class MetricGauge implements Gauge, DynamicTagsCallback {
private Map<String, String> tags;
private String metricName;
public MetricGauge(String metricName, Map<String, String> tagsMap) {
this.tags = tagsMap;
this.metricName = metricName;
}
@Override
public Object getValue() {
return metricsMap.get(metricName).get();
}
@Override
public List<String> getTags() {
List<String> tagsList = Lists.newArrayList();
for (Map.Entry<String, String> entry : tags.entrySet()) {
tagsList.add(entry.getKey() + ":" + entry.getValue());
}
return tagsList;
}
}
private void updateDataDogTransport(ReportingContext context) throws IOException {
String dataDogTransport = context.getProperty(DATADOG_TRANSPORT).getValue();
if (dataDogTransport.equalsIgnoreCase(DATADOG_AGENT.getValue())) {
ddMetricRegistryBuilder.build("agent");
} else if (dataDogTransport.equalsIgnoreCase(DATADOG_HTTP.getValue())
&& context.getProperty(API_KEY).isSet()) {
ddMetricRegistryBuilder.build(context.getProperty(API_KEY).getValue());
}
}
private void populateProcessorStatuses(final ProcessGroupStatus groupStatus, final List<ProcessorStatus> statuses) {
statuses.addAll(groupStatus.getProcessorStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateProcessorStatuses(childGroupStatus, statuses);
}
}
private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
statuses.addAll(groupStatus.getConnectionStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateConnectionStatuses(childGroupStatus, statuses);
}
}
private void populateInputPortStatuses(final ProcessGroupStatus groupStatus, final List<PortStatus> statuses) {
statuses.addAll(groupStatus.getInputPortStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateInputPortStatuses(childGroupStatus, statuses);
}
}
private void populateOutputPortStatuses(final ProcessGroupStatus groupStatus, final List<PortStatus> statuses) {
statuses.addAll(groupStatus.getOutputPortStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateOutputPortStatuses(childGroupStatus, statuses);
}
}
private String buildMetricName(Optional<String> processorName, String metricName) {
return metricsPrefix + "." + processorName.or("flow") + "." + metricName;
}
protected MetricsService getMetricsService() {
return new MetricsService();
}
protected DDMetricRegistryBuilder getMetricRegistryBuilder() {
return new DDMetricRegistryBuilder();
}
protected MetricRegistry getMetricRegistry() {
return new MetricRegistry();
}
protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
return new ConcurrentHashMap<>();
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog.api;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
/**
* Builds the JsonObject for an individual metric.
*/
public class MetricBuilder {
private final JsonBuilderFactory factory;
private String applicationId;
private String instanceId;
private String hostname;
private String timestamp;
private String metricName;
private String metricValue;
public MetricBuilder(final JsonBuilderFactory factory) {
this.factory = factory;
}
public MetricBuilder applicationId(final String applicationId) {
this.applicationId = applicationId;
return this;
}
public MetricBuilder instanceId(final String instanceId) {
this.instanceId = instanceId;
return this;
}
public MetricBuilder hostname(final String hostname) {
this.hostname = hostname;
return this;
}
public MetricBuilder timestamp(final long timestamp) {
this.timestamp = String.valueOf(timestamp);
return this;
}
public MetricBuilder metricName(final String metricName) {
this.metricName = metricName;
return this;
}
public MetricBuilder metricValue(final String metricValue) {
this.metricValue = metricValue;
return this;
}
public JsonObject build() {
return factory.createObjectBuilder()
.add(MetricFields.METRIC_NAME, metricName)
.add(MetricFields.APP_ID, applicationId)
.add(MetricFields.INSTANCE_ID, instanceId)
.add(MetricFields.HOSTNAME, hostname)
.add(MetricFields.TIMESTAMP, timestamp)
.add(MetricFields.START_TIME, timestamp)
.add(MetricFields.METRICS,
factory.createObjectBuilder()
.add(String.valueOf(timestamp), metricValue)
).build();
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog.api;
public interface MetricFields {
String METRIC_NAME = "metricname";
String APP_ID = "appid";
String INSTANCE_ID = "instanceid";
String HOSTNAME = "hostname";
String TIMESTAMP = "timestamp";
String START_TIME = "starttime";
String METRICS = "metrics";
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog.api;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.util.HashMap;
import java.util.Map;
/**
* Builds the overall JsonObject for the Metrics.
*/
public class MetricsBuilder {
static final String ROOT_JSON_ELEMENT = "metrics";
private final JsonBuilderFactory factory;
private long timestamp;
private String applicationId;
private String instanceId;
private String hostname;
private Map<String,String> metrics = new HashMap<>();
public MetricsBuilder(final JsonBuilderFactory factory) {
this.factory = factory;
}
public MetricsBuilder applicationId(final String applicationId) {
this.applicationId = applicationId;
return this;
}
public MetricsBuilder instanceId(final String instanceId) {
this.instanceId = instanceId;
return this;
}
public MetricsBuilder hostname(final String hostname) {
this.hostname = hostname;
return this;
}
public MetricsBuilder timestamp(final long timestamp) {
this.timestamp = timestamp;
return this;
}
public MetricsBuilder metric(final String name, String value) {
this.metrics.put(name, value);
return this;
}
public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
this.metrics.putAll(metrics);
return this;
}
public JsonObject build() {
// builds JsonObject for individual metrics
final MetricBuilder metricBuilder = new MetricBuilder(factory);
metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
for (Map.Entry<String,String> entry : metrics.entrySet()) {
metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
metricArrayBuilder.add(metricBuilder.build());
}
// add the array of metrics to a top-level json object
final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
return metricsBuilder.build();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog.metrics;
/**
* The Metric names to send to DataDog.
*/
public interface MetricNames {
// NiFi Metrics
String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
String BYTES_SENT = "BytesSentLast5Minutes";
String FLOW_FILES_QUEUED = "FlowFilesQueued";
String BYTES_QUEUED = "BytesQueued";
String BYTES_READ = "BytesReadLast5Minutes";
String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
String ACTIVE_THREADS = "ActiveThreads";
String TOTAL_TASK_DURATION = "TotalTaskDurationSeconds";
// JVM Metrics
String JVM_UPTIME = "jvm.uptime";
String JVM_HEAP_USED = "jvm.heap_used";
String JVM_HEAP_USAGE = "jvm.heap_usage";
String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
String JVM_THREAD_COUNT = "jvm.thread_count";
String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
String JVM_GC_RUNS = "jvm.gc.runs";
String JVM_GC_TIME = "jvm.gc.time";
// Port status metrics
String INPUT_COUNT = "InputCount";
String INPUT_BYTES = "InputBytes";
String OUTPUT_COUNT = "OutputCount";
String OUTPUT_BYTES = "OutputBytes";
//Connection status metrics
String QUEUED_COUNT = "QueuedCount";
String QUEUED_BYTES = "QueuedBytes";
//Port status tags
String PORT_ID = "port-id";
String PORT_GROUP_ID = "port-group-id";
String PORT_NAME = "port-name";
//Connection status tags
String CONNECTION_ID = "connection-id";
String CONNECTION_GROUP_ID = "connection-group-id";
String CONNECTION_NAME = "connection-name";
String CONNECTION_SOURCE_ID = "connection-source-id";
String CONNECTION_SOURCE_NAME = "connection-source-name";
String CONNECTION_DESTINATION_ID = "connection-destination-id";
String CONNECTTION_DESTINATION_NAME = "connection-destination-name";
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog.metrics;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A service used to produce key/value metrics based on a given input.
*/
public class MetricsService {
//processor - specific metrics
public Map<String, Double> getProcessorMetrics(ProcessorStatus status) {
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getInputCount()));
metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getOutputCount()));
metrics.put(MetricNames.BYTES_READ, new Double(status.getInputBytes()));
metrics.put(MetricNames.BYTES_WRITTEN, new Double(status.getOutputBytes()));
metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
metrics.put(MetricNames.TOTAL_TASK_DURATION, new Double(status.getProcessingNanos()));
return metrics;
}
public Map<String, Double> getPortStatusMetrics(PortStatus status){
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
metrics.put(MetricNames.INPUT_COUNT, new Double(status.getInputCount()));
metrics.put(MetricNames.OUTPUT_COUNT, new Double(status.getOutputCount()));
metrics.put(MetricNames.INPUT_BYTES, new Double(status.getInputBytes()));
metrics.put(MetricNames.OUTPUT_BYTES, new Double(status.getOutputBytes()));
metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getFlowFilesReceived()));
metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getFlowFilesSent()));
metrics.put(MetricNames.BYTES_RECEIVED, new Double(status.getBytesReceived()));
metrics.put(MetricNames.BYTES_SENT, new Double(status.getBytesSent()));
return metrics;
}
public Map<String,String> getPortStatusTags(PortStatus status) {
final Map<String, String> portTags = new HashMap<>();
portTags.put(MetricNames.PORT_ID, status.getId());
portTags.put(MetricNames.PORT_GROUP_ID, status.getGroupId());
portTags.put(MetricNames.PORT_NAME, status.getName());
return portTags;
}
public Map<String,String> getConnectionStatusTags(ConnectionStatus status) {
final Map<String, String> connectionTags = new HashMap<>();
connectionTags.put(MetricNames.CONNECTION_ID, status.getId());
connectionTags.put(MetricNames.CONNECTION_NAME, status.getName());
connectionTags.put(MetricNames.CONNECTION_GROUP_ID, status.getGroupId());
connectionTags.put(MetricNames.CONNECTION_DESTINATION_ID, status.getDestinationId());
connectionTags.put(MetricNames.CONNECTTION_DESTINATION_NAME, status.getDestinationName());
connectionTags.put(MetricNames.CONNECTION_SOURCE_ID, status.getSourceId());
connectionTags.put(MetricNames.CONNECTION_SOURCE_NAME, status.getSourceName());
return connectionTags;
}
public Map<String, Double> getConnectionStatusMetrics(ConnectionStatus status) {
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.INPUT_COUNT, new Double(status.getInputCount()));
metrics.put(MetricNames.INPUT_BYTES, new Double(status.getInputBytes()));
metrics.put(MetricNames.QUEUED_COUNT, new Double(status.getQueuedCount()));
metrics.put(MetricNames.QUEUED_BYTES, new Double(status.getQueuedBytes()));
metrics.put(MetricNames.OUTPUT_COUNT, new Double(status.getOutputCount()));
metrics.put(MetricNames.OUTPUT_BYTES, new Double(status.getOutputBytes()));
return metrics;
}
//general metrics for whole dataflow
public Map<String, Double> getDataFlowMetrics(ProcessGroupStatus status) {
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getFlowFilesReceived()));
metrics.put(MetricNames.BYTES_RECEIVED, new Double(status.getBytesReceived()));
metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getFlowFilesSent()));
metrics.put(MetricNames.BYTES_SENT, new Double(status.getBytesSent()));
metrics.put(MetricNames.FLOW_FILES_QUEUED, new Double(status.getQueuedCount()));
metrics.put(MetricNames.BYTES_QUEUED, new Double(status.getQueuedContentSize()));
metrics.put(MetricNames.BYTES_READ, new Double(status.getBytesRead()));
metrics.put(MetricNames.BYTES_WRITTEN, new Double(status.getBytesWritten()));
metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
metrics.put(MetricNames.TOTAL_TASK_DURATION, new Double(calculateProcessingNanos(status)));
status.getOutputPortStatus();
return metrics;
}
public List<String> getAllTagsList () {
List<String> tagsList = new ArrayList<>();
tagsList.add("env");
tagsList.add("dataflow_id");
tagsList.add(MetricNames.PORT_ID);
tagsList.add(MetricNames.PORT_NAME);
tagsList.add(MetricNames.PORT_GROUP_ID);
tagsList.add(MetricNames.CONNECTION_ID);
tagsList.add(MetricNames.CONNECTION_NAME);
tagsList.add(MetricNames.CONNECTION_GROUP_ID);
tagsList.add(MetricNames.CONNECTION_SOURCE_ID);
tagsList.add(MetricNames.CONNECTION_SOURCE_NAME);
tagsList.add(MetricNames.CONNECTION_DESTINATION_ID);
tagsList.add(MetricNames.CONNECTTION_DESTINATION_NAME);
return tagsList;
}
//virtual machine metrics
public Map<String, Double> getJVMMetrics(VirtualMachineMetrics virtualMachineMetrics) {
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.JVM_UPTIME, new Double(virtualMachineMetrics.uptime()));
metrics.put(MetricNames.JVM_HEAP_USED, new Double(virtualMachineMetrics.heapUsed()));
metrics.put(MetricNames.JVM_HEAP_USAGE, new Double(virtualMachineMetrics.heapUsage()));
metrics.put(MetricNames.JVM_NON_HEAP_USAGE, new Double(virtualMachineMetrics.nonHeapUsage()));
metrics.put(MetricNames.JVM_THREAD_COUNT, new Double(virtualMachineMetrics.threadCount()));
metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, new Double(virtualMachineMetrics.daemonThreadCount()));
metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, new Double(virtualMachineMetrics.fileDescriptorUsage()));
for (Map.Entry<Thread.State, Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
switch (entry.getKey()) {
case BLOCKED:
metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, new Double(normalizedValue));
break;
case RUNNABLE:
metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, new Double(normalizedValue));
break;
case TERMINATED:
metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, new Double(normalizedValue));
break;
case TIMED_WAITING:
metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, new Double(normalizedValue));
break;
default:
break;
}
}
for (Map.Entry<String, VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
final String gcName = entry.getKey().replace(" ", "");
final long runs = entry.getValue().getRuns();
final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName,new Double(runs));
metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, new Double(timeMS));
}
return metrics;
}
// calculates the total processing time of all processors in nanos
protected long calculateProcessingNanos(final ProcessGroupStatus status) {
long nanos = 0L;
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
nanos += procStats.getProcessingNanos();
}
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
nanos += calculateProcessingNanos(childGroupStatus);
}
return nanos;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.datadog.DataDogReportingTask

View File

@ -0,0 +1,56 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>DataDogReportingTask</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>DataDogReportingTask</h2>
<p>This ReportingTask sends the following metrics to DataDog:</p>
<ul>
<li>FlowFilesReceivedLast5Minutes</li>
<li>BytesReceivedLast5Minutes</li>
<li>FlowFilesSentLast5Minutes</li>
<li>BytesSentLast5Minutes</li>
<li>FlowFilesQueued</li>
<li>BytesQueued</li>
<li>BytesReadLast5Minutes</li>
<li>BytesWrittenLast5Minutes</li>
<li>ActiveThreads</li>
<li>TotalTaskDurationSeconds</li>
<li>jvm.uptime</li>
<li>jvm.heap_used</li>
<li>jvm.heap_usage</li>
<li>jvm.non_heap_usage</li>
<li>jvm.thread_states.runnable</li>
<li>jvm.thread_states.blocked</li>
<li>jvm.thread_states.timed_waiting</li>
<li>jvm.thread_states.terminated</li>
<li>jvm.thread_count</li>
<li>jvm.daemon_thread_count</li>
<li>jvm.file_descriptor_usage</li>
<li>jvm.gc.runs</li>
<li>jvm.gc.time</li>
</ul>
<p>
Please consult the DataDog and NiFi documentation for further details.
</p>
</body>
</html>

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AtomicDouble;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.datadog.metrics.MetricsService;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
public class TestDataDogReportingTask {
private ProcessGroupStatus status;
private ProcessorStatus procStatus;
private ConcurrentHashMap<String, AtomicDouble> metricsMap;
private MetricRegistry metricRegistry;
private MetricsService metricsService;
private String env = "dev";
private String prefix = "nifi";
private ReportingContext context;
private ReportingInitializationContext initContext;
private ConfigurationContext configurationContext;
private volatile VirtualMachineMetrics virtualMachineMetrics;
private Logger logger;
@Before
public void setup() {
initProcessGroupStatus();
initProcessorStatuses();
initContexts();
}
//init all contexts
private void initContexts() {
configurationContext = Mockito.mock(ConfigurationContext.class);
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getProperty(DataDogReportingTask.ENVIRONMENT))
.thenReturn(new MockPropertyValue(env, null));
Mockito.when(context.getProperty(DataDogReportingTask.METRICS_PREFIX))
.thenReturn(new MockPropertyValue(prefix, null));
Mockito.when(context.getProperty(DataDogReportingTask.API_KEY))
.thenReturn(new MockPropertyValue("agent", null));
Mockito.when(context.getProperty(DataDogReportingTask.DATADOG_TRANSPORT))
.thenReturn(new MockPropertyValue("DataDog Agent", null));
EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
logger = Mockito.mock(Logger.class);
initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
//Mockito.when(initContext.getLogger()).thenReturn(logger);
metricsMap = new ConcurrentHashMap<>();
metricRegistry = Mockito.mock(MetricRegistry.class);
virtualMachineMetrics = VirtualMachineMetrics.getInstance();
metricsService = Mockito.mock(MetricsService.class);
}
//test onTrigger method
@Test
public void testOnTrigger() throws InitializationException, IOException {
DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
dataDogReportingTask.initialize(initContext);
dataDogReportingTask.setup(configurationContext);
dataDogReportingTask.onTrigger(context);
verify(metricsService, atLeast(1)).getProcessorMetrics(Mockito.<ProcessorStatus>any());
verify(metricsService, atLeast(1)).getJVMMetrics(Mockito.<VirtualMachineMetrics>any());
}
//test updating metrics of processors
@Test
public void testUpdateMetricsProcessor() throws InitializationException, IOException {
MetricsService ms = new MetricsService();
Map<String, Double> processorMetrics = ms.getProcessorMetrics(procStatus);
Map<String, String> tagsMap = ImmutableMap.of("env", "test");
DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
dataDogReportingTask.initialize(initContext);
dataDogReportingTask.setup(configurationContext);
dataDogReportingTask.updateMetrics(processorMetrics, Optional.of("sampleProcessor"), tagsMap);
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"), Mockito.<Gauge>any());
}
//test updating JMV metrics
@Test
public void testUpdateMetricsJVM() throws InitializationException, IOException {
MetricsService ms = new MetricsService();
Map<String, Double> processorMetrics = ms.getJVMMetrics(virtualMachineMetrics);
Map<String, String> tagsMap = ImmutableMap.of("env", "test");
DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
dataDogReportingTask.initialize(initContext);
dataDogReportingTask.setup(configurationContext);
dataDogReportingTask.updateMetrics(processorMetrics, Optional.<String>absent(), tagsMap);
verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), Mockito.<Gauge>any());
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), Mockito.<Gauge>any());
}
private void initProcessGroupStatus() {
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
status.setInputCount(2);
status.setOutputCount(4);
}
private void initProcessorStatuses() {
procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
procStatus.setInputCount(2);
procStatus.setOutputCount(4);
procStatus.setActiveThreadCount(6);
procStatus.setBytesSent(1256);
procStatus.setName("sampleProcessor");
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
status.setProcessGroupStatus(groupStatuses);
}
private class TestableDataDogReportingTask extends DataDogReportingTask {
@Override
protected MetricsService getMetricsService() {
return metricsService;
}
@Override
protected DDMetricRegistryBuilder getMetricRegistryBuilder() {
return new DDMetricRegistryBuilder();
}
@Override
protected MetricRegistry getMetricRegistry() {
return metricRegistry;
}
@Override
protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
return metricsMap;
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.datadog;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.reporting.datadog.metrics.MetricNames;
import org.apache.nifi.reporting.datadog.metrics.MetricsService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestMetricsService {
private ProcessGroupStatus status;
private MetricsService metricsService;
@Before
public void init() {
status = new ProcessGroupStatus();
metricsService = new MetricsService();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
}
//test group status metric retreiveing
@Test
public void testGetProcessGroupStatusMetrics() {
ProcessorStatus procStatus = new ProcessorStatus();
List<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
final Map<String, Double> metrics = metricsService.getDataFlowMetrics(status);
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
}
//test processor status metric retreiveing
@Test
public void testGetProcessorGroupStatusMetrics() {
ProcessorStatus procStatus = new ProcessorStatus();
List<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
final Map<String, Double> metrics = metricsService.getProcessorMetrics(procStatus);
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
}
//test JVM status metric retreiveing
@Test
public void testGetVirtualMachineMetrics() {
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
final Map<String, Double> metrics = metricsService.getJVMMetrics(virtualMachineMetrics);
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
}
}

View File

@ -0,0 +1,41 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-datadog-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-datadog-reporting-task</module>
<module>nifi-datadog-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.19</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

1
nifi-nar-bundles/pom.xml Normal file → Executable file
View File

@ -66,6 +66,7 @@
<module>nifi-evtx-bundle</module>
<module>nifi-slack-bundle</module>
<module>nifi-snmp-bundle</module>
<module>nifi-datadog-bundle</module>
<module>nifi-windows-event-log-bundle</module>
<module>nifi-ignite-bundle</module>
<module>nifi-email-bundle</module>