diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE
index 7687752ffa..346bf945bd 100644
--- a/nifi-assembly/LICENSE
+++ b/nifi-assembly/LICENSE
@@ -1487,3 +1487,33 @@ This project bundles 'Jython' which is available under a Python Software Foundat
OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+This project bundles 'metrics-datadog' which is available
+ under a BSD license.
+
+Copyright (c) 2014, Vistar Media
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of Vistar Media nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index a1cd33e5a2..0a98bd13eb 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -59,6 +59,14 @@ The following binary components are provided under the Apache Software License v
Tatu Saloranta (http://wiki.fasterxml.com/TatuSaloranta)
+ (ASLv2) Google Guava
+ The following NOTICE information applies:
+ Copyright 2011 Guava International Limited
+
+ (ASLv2) Dropwizard Metrics
+ The following NOTICE information applies:
+ Copyright (c) 2010-2013 Coda Hale, Yammer.com
+
(ASLv2) Jasypt
The following NOTICE information applies:
Copyright (c) 2007-2010, The JASYPT team (http://www.jasypt.org)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
old mode 100644
new mode 100755
index 1cf5bc2853..7228a51412
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -83,6 +83,7 @@ language governing permissions and limitations under the License. -->
org.apache.nifinifi-framework-api
+ 1.0.0-SNAPSHOTorg.apache.nifi
@@ -131,6 +132,12 @@ language governing permissions and limitations under the License. -->
nifi-distributed-cache-services-narnar
+
+ org.apache.nifi
+ nifi-datadog-nar
+ 1.0.0-SNAPSHOT
+ nar
+ org.apache.nifinifi-standard-nar
@@ -367,7 +374,147 @@ language governing permissions and limitations under the License. -->
nar
+
+
+ 512
+ 128
+
+
+ ${project.version}
+ true
+ 10 sec
+ 500 ms
+ 30 sec
+ 10 millis
+ ./conf/flow.xml.gz
+ true
+ ./conf/archive/
+ 30 days
+ 500 MB
+ ./conf/login-identity-providers.xml
+ ./conf/authorizers.xml
+ ./conf/templates
+ ./database_repository
+
+ ./conf/state-management.xml
+ false
+ ./conf/zookeeper.properties
+ local-provider
+ zk-provider
+
+ org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
+ ./flowfile_repository
+ 256
+ 2 mins
+ false
+ org.apache.nifi.controller.FileSystemSwapManager
+ 20000
+ 5 sec
+ 1
+ 5 sec
+ 4
+
+ org.apache.nifi.controller.repository.FileSystemRepository
+ 10 MB
+ 100
+ ./content_repository
+ 12 hours
+ 50%
+ true
+ false
+ /nifi-content-viewer/
+
+
+
+ 30 sec
+ ./lib
+ ./work/nar/
+ ./work/docs/components
+
+ PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ BC
+ ;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+ 9990
+
+
+ org.apache.nifi.provenance.PersistentProvenanceRepository
+ ./provenance_repository
+ 24 hours
+ 1 GB
+ 30 secs
+ 100 MB
+ 2
+ 1
+ true
+ EventType, FlowFileUUID, Filename, ProcessorID, Relationship
+
+ 500 MB
+ false
+ 16
+ 65536
+
+
+ 100000
+
+
+ org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
+ 1440
+ 1 min
+
+
+ ./lib
+
+ 8080
+
+
+ ./work/jetty
+ 200
+
+
+
+
+
+
+
+
+
+
+ file-provider
+
+
+
+
+
+
+ 5 sec
+ false
+
+
+ false
+
+
+ 10
+ 25
+ 5 sec
+ 5 sec
+
+
+ 15 secs
+
+
+
+ 3 secs
+ 3 secs
+ /nifi
+
+
+
+
+
+ 12 hours
+ rpm
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 7092dd08c4..5080583ff2 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -137,7 +137,6 @@ public class AmbariReportingTask extends AbstractReportingTask {
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
}
-
}
// calculate the current metrics, but store them to be sent next time
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/pom.xml b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/pom.xml
new file mode 100644
index 0000000000..eff4df560a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/pom.xml
@@ -0,0 +1,36 @@
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-datadog-bundle
+ 1.0.0-SNAPSHOT
+
+
+ nifi-datadog-nar
+ nar
+
+ true
+ true
+
+
+
+ org.apache.nifi
+ nifi-datadog-reporting-task
+ 1.0.0-SNAPSHOT
+
+
+
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..893b41caaf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,239 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+
+This project bundles 'metrics-datadog' which is available
+ under a BSD license.
+
+Copyright (c) 2014, Vistar Media
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of Vistar Media nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..0877a5a9dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,46 @@
+nifi-ambari-nar
+Copyright 2015-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Yammer Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2012 Coda Hale and Yammer, Inc.
+
+ This product includes software developed by Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
+ with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
+ (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
+ (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)
+
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml
new file mode 100644
index 0000000000..f89ef563e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-datadog-bundle
+ 1.0.0-SNAPSHOT
+
+
+ nifi-datadog-reporting-task
+ Publishes NiFi metrics to datadog
+
+
+
+ org.glassfish.jersey.core
+ jersey-client
+
+
+ org.glassfish
+ javax.json
+ 1.0.4
+
+
+ javax.json
+ javax.json-api
+ 1.0
+
+
+ com.yammer.metrics
+ metrics-core
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+ io.dropwizard.metrics
+ metrics-core
+ 3.1.0
+
+
+ org.coursera
+ metrics-datadog
+ 1.1.5
+
+
+ com.google.guava
+ guava
+ 19.0
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
new file mode 100644
index 0000000000..c00eec5fa2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+import com.codahale.metrics.MetricRegistry;
+import org.coursera.metrics.datadog.DatadogReporter;
+import org.coursera.metrics.datadog.transport.HttpTransport;
+import org.coursera.metrics.datadog.transport.Transport;
+import org.coursera.metrics.datadog.transport.UdpTransport;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Class configures MetricRegistry (passed outside or created from scratch) with Datadog support
+ */
+public class DDMetricRegistryBuilder {
+
+
+ private MetricRegistry metricRegistry = null;
+ private List tags = Arrays.asList();
+ private DatadogReporter datadogReporter;
+ private String apiKey = "";
+ private Transport transport;
+
+ public DDMetricRegistryBuilder setMetricRegistry(MetricRegistry metricRegistry) {
+ this.metricRegistry = metricRegistry;
+ return this;
+ }
+
+ public DDMetricRegistryBuilder setTags(List tags) {
+ this.tags = tags;
+ return this;
+ }
+
+ public DatadogReporter getDatadogReporter() {
+ return datadogReporter;
+ }
+
+ public MetricRegistry build(String apiKey) throws IOException {
+ if (metricRegistry == null)
+ metricRegistry = new MetricRegistry();
+
+ if (createTransport(apiKey)) {
+ datadogReporter = createDatadogReporter(this.metricRegistry);
+ }
+ return this.metricRegistry;
+ }
+
+ private boolean createTransport(String apiKey) {
+ //if api key was not changed
+ if (this.apiKey.equals(apiKey)) {
+ return false;
+ } else if (apiKey.equals("agent")) {
+ this.apiKey = "agent";
+ transport = new UdpTransport.Builder().build();
+ return true;
+ } else {
+ this.apiKey = apiKey;
+ transport = new HttpTransport.Builder().withApiKey(apiKey).build();
+ return true;
+ }
+ }
+
+ /*
+ * create DataDog reporter
+ */
+ private DatadogReporter createDatadogReporter(MetricRegistry metricRegistry) throws IOException {
+ DatadogReporter reporter =
+ DatadogReporter.forRegistry(metricRegistry)
+ .withHost(InetAddress.getLocalHost().getHostName())
+ .withTransport(transport)
+ .withTags(tags)
+ .build();
+ return reporter;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
new file mode 100644
index 0000000000..4ff52928c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.datadog.metrics.MetricsService;
+import org.coursera.metrics.datadog.DynamicTagsCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+
+@Tags({"reporting", "datadog", "metrics"})
+@CapabilityDescription("Publishes metrics from NiFi to datadog")
+public class DataDogReportingTask extends AbstractReportingTask {
+
+ static final AllowableValue DATADOG_AGENT = new AllowableValue("DataDog Agent", "DataDog Agent",
+ "Metrics will be sent via locally installed DataDog agent. " +
+ "DataDog agent needs to be installed manually before using this option");
+
+ static final AllowableValue DATADOG_HTTP = new AllowableValue("Datadog HTTP", "Datadog HTTP",
+ "Metrics will be sent via HTTP transport with no need of Agent installed. " +
+ "DataDog API key needs to be set");
+
+ static final PropertyDescriptor DATADOG_TRANSPORT = new PropertyDescriptor.Builder()
+ .name("DataDog transport")
+ .description("Transport through which metrics will be sent to DataDog")
+ .required(true)
+ .allowableValues(DATADOG_AGENT, DATADOG_HTTP)
+ .defaultValue(DATADOG_HTTP.getValue())
+ .build();
+
+ static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
+ .name("API key")
+ .description("DataDog API key. If specified value is 'agent', local DataDog agent will be used.")
+ .expressionLanguageSupported(false)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor METRICS_PREFIX = new PropertyDescriptor.Builder()
+ .name("Metrics prefix")
+ .description("Prefix to be added before every metric")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor ENVIRONMENT = new PropertyDescriptor.Builder()
+ .name("Environment")
+ .description("Environment, dataflow is running in. " +
+ "This property will be included as metrics tag.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("dev")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private MetricsService metricsService;
+ private DDMetricRegistryBuilder ddMetricRegistryBuilder;
+ private MetricRegistry metricRegistry;
+ private String metricsPrefix;
+ private String environment;
+ private String statusId;
+ private ConcurrentHashMap metricsMap;
+ private Map defaultTags;
+ private volatile VirtualMachineMetrics virtualMachineMetrics;
+ private Logger logger = LoggerFactory.getLogger(getClass().getName());
+
+ @OnScheduled
+ public void setup(final ConfigurationContext context) {
+ metricsService = getMetricsService();
+ ddMetricRegistryBuilder = getMetricRegistryBuilder();
+ metricRegistry = getMetricRegistry();
+ metricsMap = getMetricsMap();
+ metricsPrefix = METRICS_PREFIX.getDefaultValue();
+ environment = ENVIRONMENT.getDefaultValue();
+ virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+ ddMetricRegistryBuilder.setMetricRegistry(metricRegistry)
+ .setTags(metricsService.getAllTagsList());
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(METRICS_PREFIX);
+ properties.add(ENVIRONMENT);
+ properties.add(API_KEY);
+ properties.add(DATADOG_TRANSPORT);
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(ReportingContext context) {
+ final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
+
+ metricsPrefix = context.getProperty(METRICS_PREFIX).evaluateAttributeExpressions().getValue();
+ environment = context.getProperty(ENVIRONMENT).evaluateAttributeExpressions().getValue();
+ statusId = status.getId();
+ defaultTags = ImmutableMap.of("env", environment, "dataflow_id", statusId);
+ try {
+ updateDataDogTransport(context);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ updateAllMetricGroups(status);
+ ddMetricRegistryBuilder.getDatadogReporter().report();
+ }
+
+ protected void updateMetrics(Map metrics, Optional processorName, Map tags) {
+ for (Map.Entry entry : metrics.entrySet()) {
+ logger.info(entry.getKey() + ": " + entry.getValue());
+ final String metricName = buildMetricName(processorName, entry.getKey());
+ //if metric is not registered yet - register it
+ if (!metricsMap.containsKey(metricName)) {
+ metricsMap.put(metricName, new AtomicDouble(entry.getValue()));
+ metricRegistry.register(metricName, new MetricGauge(metricName, tags));
+ }
+ //set real time value to metrics map
+ metricsMap.get(metricName).set(entry.getValue());
+ }
+ }
+
+ private void updateAllMetricGroups (ProcessGroupStatus processGroupStatus) {
+ final List processorStatuses = new ArrayList<>();
+ populateProcessorStatuses(processGroupStatus, processorStatuses);
+ for (final ProcessorStatus processorStatus : processorStatuses) {
+ updateMetrics(metricsService.getProcessorMetrics(processorStatus),
+ Optional.of(processorStatus.getName()), defaultTags);
+ }
+
+ final List connectionStatuses = new ArrayList<>();
+ populateConnectionStatuses(processGroupStatus, connectionStatuses);
+ for (ConnectionStatus connectionStatus: connectionStatuses) {
+ Map connectionStatusTags = new HashMap<>(defaultTags);
+ connectionStatusTags.putAll(metricsService.getConnectionStatusTags(connectionStatus));
+ updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus), Optional.absent(), connectionStatusTags);
+ }
+
+ final List inputPortStatuses = new ArrayList<>();
+ populateInputPortStatuses(processGroupStatus, inputPortStatuses);
+ for (PortStatus portStatus: inputPortStatuses) {
+ Map portTags = new HashMap<>(defaultTags);
+ portTags.putAll(metricsService.getPortStatusTags(portStatus));
+ updateMetrics(metricsService.getPortStatusMetrics(portStatus), Optional.absent(), portTags);
+ }
+
+ final List outputPortStatuses = new ArrayList<>();
+ populateOutputPortStatuses(processGroupStatus, outputPortStatuses);
+ for (PortStatus portStatus: outputPortStatuses) {
+ Map portTags = new HashMap<>(defaultTags);
+ portTags.putAll(metricsService.getPortStatusTags(portStatus));
+ updateMetrics(metricsService.getPortStatusMetrics(portStatus), Optional.absent(), portTags);
+ }
+
+ updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics),
+ Optional.absent(), defaultTags);
+ updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus), Optional.absent(), defaultTags);
+ }
+
+ private class MetricGauge implements Gauge, DynamicTagsCallback {
+ private Map tags;
+ private String metricName;
+
+ public MetricGauge(String metricName, Map tagsMap) {
+ this.tags = tagsMap;
+ this.metricName = metricName;
+ }
+
+ @Override
+ public Object getValue() {
+ return metricsMap.get(metricName).get();
+ }
+
+ @Override
+ public List getTags() {
+ List tagsList = Lists.newArrayList();
+ for (Map.Entry entry : tags.entrySet()) {
+ tagsList.add(entry.getKey() + ":" + entry.getValue());
+ }
+ return tagsList;
+ }
+ }
+
+ private void updateDataDogTransport(ReportingContext context) throws IOException {
+ String dataDogTransport = context.getProperty(DATADOG_TRANSPORT).getValue();
+ if (dataDogTransport.equalsIgnoreCase(DATADOG_AGENT.getValue())) {
+ ddMetricRegistryBuilder.build("agent");
+ } else if (dataDogTransport.equalsIgnoreCase(DATADOG_HTTP.getValue())
+ && context.getProperty(API_KEY).isSet()) {
+ ddMetricRegistryBuilder.build(context.getProperty(API_KEY).getValue());
+ }
+ }
+
+ private void populateProcessorStatuses(final ProcessGroupStatus groupStatus, final List statuses) {
+ statuses.addAll(groupStatus.getProcessorStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateProcessorStatuses(childGroupStatus, statuses);
+ }
+ }
+
+ private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List statuses) {
+ statuses.addAll(groupStatus.getConnectionStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateConnectionStatuses(childGroupStatus, statuses);
+ }
+ }
+
+ private void populateInputPortStatuses(final ProcessGroupStatus groupStatus, final List statuses) {
+ statuses.addAll(groupStatus.getInputPortStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateInputPortStatuses(childGroupStatus, statuses);
+ }
+ }
+
+ private void populateOutputPortStatuses(final ProcessGroupStatus groupStatus, final List statuses) {
+ statuses.addAll(groupStatus.getOutputPortStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateOutputPortStatuses(childGroupStatus, statuses);
+ }
+ }
+
+ private String buildMetricName(Optional processorName, String metricName) {
+ return metricsPrefix + "." + processorName.or("flow") + "." + metricName;
+ }
+
+ protected MetricsService getMetricsService() {
+ return new MetricsService();
+ }
+
+ protected DDMetricRegistryBuilder getMetricRegistryBuilder() {
+ return new DDMetricRegistryBuilder();
+ }
+
+ protected MetricRegistry getMetricRegistry() {
+ return new MetricRegistry();
+ }
+
+ protected ConcurrentHashMap getMetricsMap() {
+ return new ConcurrentHashMap<>();
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricBuilder.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricBuilder.java
new file mode 100644
index 0000000000..d5de4c7083
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+ private final JsonBuilderFactory factory;
+
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private String timestamp;
+ private String metricName;
+ private String metricValue;
+
+ public MetricBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricBuilder timestamp(final long timestamp) {
+ this.timestamp = String.valueOf(timestamp);
+ return this;
+ }
+
+ public MetricBuilder metricName(final String metricName) {
+ this.metricName = metricName;
+ return this;
+ }
+
+ public MetricBuilder metricValue(final String metricValue) {
+ this.metricValue = metricValue;
+ return this;
+ }
+
+ public JsonObject build() {
+ return factory.createObjectBuilder()
+ .add(MetricFields.METRIC_NAME, metricName)
+ .add(MetricFields.APP_ID, applicationId)
+ .add(MetricFields.INSTANCE_ID, instanceId)
+ .add(MetricFields.HOSTNAME, hostname)
+ .add(MetricFields.TIMESTAMP, timestamp)
+ .add(MetricFields.START_TIME, timestamp)
+ .add(MetricFields.METRICS,
+ factory.createObjectBuilder()
+ .add(String.valueOf(timestamp), metricValue)
+ ).build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricFields.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricFields.java
new file mode 100644
index 0000000000..df0291da63
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog.api;
+
+public interface MetricFields {
+
+ String METRIC_NAME = "metricname";
+ String APP_ID = "appid";
+ String INSTANCE_ID = "instanceid";
+ String HOSTNAME = "hostname";
+ String TIMESTAMP = "timestamp";
+ String START_TIME = "starttime";
+ String METRICS = "metrics";
+
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricsBuilder.java
new file mode 100644
index 0000000000..3aa13b0612
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+ static final String ROOT_JSON_ELEMENT = "metrics";
+
+ private final JsonBuilderFactory factory;
+
+ private long timestamp;
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private Map metrics = new HashMap<>();
+
+ public MetricsBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricsBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricsBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricsBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricsBuilder timestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public MetricsBuilder metric(final String name, String value) {
+ this.metrics.put(name, value);
+ return this;
+ }
+
+ public MetricsBuilder addAllMetrics(final Map metrics) {
+ this.metrics.putAll(metrics);
+ return this;
+ }
+
+ public JsonObject build() {
+ // builds JsonObject for individual metrics
+ final MetricBuilder metricBuilder = new MetricBuilder(factory);
+ metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+ final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
+
+ for (Map.Entry entry : metrics.entrySet()) {
+ metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+ metricArrayBuilder.add(metricBuilder.build());
+ }
+
+ // add the array of metrics to a top-level json object
+ final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+ metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+ return metricsBuilder.build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
new file mode 100644
index 0000000000..b176a3313a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog.metrics;
+
+/**
+ * The Metric names to send to DataDog.
+ */
+public interface MetricNames {
+
+ // NiFi Metrics
+ String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+ String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+ String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+ String BYTES_SENT = "BytesSentLast5Minutes";
+ String FLOW_FILES_QUEUED = "FlowFilesQueued";
+ String BYTES_QUEUED = "BytesQueued";
+ String BYTES_READ = "BytesReadLast5Minutes";
+ String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+ String ACTIVE_THREADS = "ActiveThreads";
+ String TOTAL_TASK_DURATION = "TotalTaskDurationSeconds";
+
+ // JVM Metrics
+ String JVM_UPTIME = "jvm.uptime";
+ String JVM_HEAP_USED = "jvm.heap_used";
+ String JVM_HEAP_USAGE = "jvm.heap_usage";
+ String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+ String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+ String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+ String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+ String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+ String JVM_THREAD_COUNT = "jvm.thread_count";
+ String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+ String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+ String JVM_GC_RUNS = "jvm.gc.runs";
+ String JVM_GC_TIME = "jvm.gc.time";
+
+ // Port status metrics
+ String INPUT_COUNT = "InputCount";
+ String INPUT_BYTES = "InputBytes";
+ String OUTPUT_COUNT = "OutputCount";
+ String OUTPUT_BYTES = "OutputBytes";
+
+ //Connection status metrics
+ String QUEUED_COUNT = "QueuedCount";
+ String QUEUED_BYTES = "QueuedBytes";
+
+ //Port status tags
+ String PORT_ID = "port-id";
+ String PORT_GROUP_ID = "port-group-id";
+ String PORT_NAME = "port-name";
+
+ //Connection status tags
+ String CONNECTION_ID = "connection-id";
+ String CONNECTION_GROUP_ID = "connection-group-id";
+ String CONNECTION_NAME = "connection-name";
+ String CONNECTION_SOURCE_ID = "connection-source-id";
+ String CONNECTION_SOURCE_NAME = "connection-source-name";
+ String CONNECTION_DESTINATION_ID = "connection-destination-id";
+ String CONNECTTION_DESTINATION_NAME = "connection-destination-name";
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
new file mode 100644
index 0000000000..cf1a625cbc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog.metrics;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+ //processor - specific metrics
+ public Map getProcessorMetrics(ProcessorStatus status) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getInputCount()));
+ metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getOutputCount()));
+ metrics.put(MetricNames.BYTES_READ, new Double(status.getInputBytes()));
+ metrics.put(MetricNames.BYTES_WRITTEN, new Double(status.getOutputBytes()));
+ metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
+ metrics.put(MetricNames.TOTAL_TASK_DURATION, new Double(status.getProcessingNanos()));
+ return metrics;
+ }
+
+ public Map getPortStatusMetrics(PortStatus status){
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
+ metrics.put(MetricNames.INPUT_COUNT, new Double(status.getInputCount()));
+ metrics.put(MetricNames.OUTPUT_COUNT, new Double(status.getOutputCount()));
+ metrics.put(MetricNames.INPUT_BYTES, new Double(status.getInputBytes()));
+ metrics.put(MetricNames.OUTPUT_BYTES, new Double(status.getOutputBytes()));
+ metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getFlowFilesReceived()));
+ metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getFlowFilesSent()));
+ metrics.put(MetricNames.BYTES_RECEIVED, new Double(status.getBytesReceived()));
+ metrics.put(MetricNames.BYTES_SENT, new Double(status.getBytesSent()));
+ return metrics;
+ }
+
+ public Map getPortStatusTags(PortStatus status) {
+ final Map portTags = new HashMap<>();
+ portTags.put(MetricNames.PORT_ID, status.getId());
+ portTags.put(MetricNames.PORT_GROUP_ID, status.getGroupId());
+ portTags.put(MetricNames.PORT_NAME, status.getName());
+ return portTags;
+ }
+
+ public Map getConnectionStatusTags(ConnectionStatus status) {
+ final Map connectionTags = new HashMap<>();
+ connectionTags.put(MetricNames.CONNECTION_ID, status.getId());
+ connectionTags.put(MetricNames.CONNECTION_NAME, status.getName());
+ connectionTags.put(MetricNames.CONNECTION_GROUP_ID, status.getGroupId());
+ connectionTags.put(MetricNames.CONNECTION_DESTINATION_ID, status.getDestinationId());
+ connectionTags.put(MetricNames.CONNECTTION_DESTINATION_NAME, status.getDestinationName());
+ connectionTags.put(MetricNames.CONNECTION_SOURCE_ID, status.getSourceId());
+ connectionTags.put(MetricNames.CONNECTION_SOURCE_NAME, status.getSourceName());
+ return connectionTags;
+ }
+
+ public Map getConnectionStatusMetrics(ConnectionStatus status) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.INPUT_COUNT, new Double(status.getInputCount()));
+ metrics.put(MetricNames.INPUT_BYTES, new Double(status.getInputBytes()));
+ metrics.put(MetricNames.QUEUED_COUNT, new Double(status.getQueuedCount()));
+ metrics.put(MetricNames.QUEUED_BYTES, new Double(status.getQueuedBytes()));
+ metrics.put(MetricNames.OUTPUT_COUNT, new Double(status.getOutputCount()));
+ metrics.put(MetricNames.OUTPUT_BYTES, new Double(status.getOutputBytes()));
+ return metrics;
+ }
+
+
+ //general metrics for whole dataflow
+ public Map getDataFlowMetrics(ProcessGroupStatus status) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.FLOW_FILES_RECEIVED, new Double(status.getFlowFilesReceived()));
+ metrics.put(MetricNames.BYTES_RECEIVED, new Double(status.getBytesReceived()));
+ metrics.put(MetricNames.FLOW_FILES_SENT, new Double(status.getFlowFilesSent()));
+ metrics.put(MetricNames.BYTES_SENT, new Double(status.getBytesSent()));
+ metrics.put(MetricNames.FLOW_FILES_QUEUED, new Double(status.getQueuedCount()));
+ metrics.put(MetricNames.BYTES_QUEUED, new Double(status.getQueuedContentSize()));
+ metrics.put(MetricNames.BYTES_READ, new Double(status.getBytesRead()));
+ metrics.put(MetricNames.BYTES_WRITTEN, new Double(status.getBytesWritten()));
+ metrics.put(MetricNames.ACTIVE_THREADS, new Double(status.getActiveThreadCount()));
+ metrics.put(MetricNames.TOTAL_TASK_DURATION, new Double(calculateProcessingNanos(status)));
+ status.getOutputPortStatus();
+ return metrics;
+ }
+
+ public List getAllTagsList () {
+ List tagsList = new ArrayList<>();
+ tagsList.add("env");
+ tagsList.add("dataflow_id");
+ tagsList.add(MetricNames.PORT_ID);
+ tagsList.add(MetricNames.PORT_NAME);
+ tagsList.add(MetricNames.PORT_GROUP_ID);
+ tagsList.add(MetricNames.CONNECTION_ID);
+ tagsList.add(MetricNames.CONNECTION_NAME);
+ tagsList.add(MetricNames.CONNECTION_GROUP_ID);
+ tagsList.add(MetricNames.CONNECTION_SOURCE_ID);
+ tagsList.add(MetricNames.CONNECTION_SOURCE_NAME);
+ tagsList.add(MetricNames.CONNECTION_DESTINATION_ID);
+ tagsList.add(MetricNames.CONNECTTION_DESTINATION_NAME);
+ return tagsList;
+ }
+
+ //virtual machine metrics
+ public Map getJVMMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.JVM_UPTIME, new Double(virtualMachineMetrics.uptime()));
+ metrics.put(MetricNames.JVM_HEAP_USED, new Double(virtualMachineMetrics.heapUsed()));
+ metrics.put(MetricNames.JVM_HEAP_USAGE, new Double(virtualMachineMetrics.heapUsage()));
+ metrics.put(MetricNames.JVM_NON_HEAP_USAGE, new Double(virtualMachineMetrics.nonHeapUsage()));
+ metrics.put(MetricNames.JVM_THREAD_COUNT, new Double(virtualMachineMetrics.threadCount()));
+ metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, new Double(virtualMachineMetrics.daemonThreadCount()));
+ metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, new Double(virtualMachineMetrics.fileDescriptorUsage()));
+
+ for (Map.Entry entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
+ final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
+ switch (entry.getKey()) {
+ case BLOCKED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, new Double(normalizedValue));
+ break;
+ case RUNNABLE:
+ metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, new Double(normalizedValue));
+ break;
+ case TERMINATED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, new Double(normalizedValue));
+ break;
+ case TIMED_WAITING:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, new Double(normalizedValue));
+ break;
+ default:
+ break;
+ }
+ }
+
+ for (Map.Entry entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+ final String gcName = entry.getKey().replace(" ", "");
+ final long runs = entry.getValue().getRuns();
+ final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
+ metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName,new Double(runs));
+ metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, new Double(timeMS));
+ }
+
+ return metrics;
+ }
+
+
+ // calculates the total processing time of all processors in nanos
+ protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+ long nanos = 0L;
+
+ for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+ nanos += procStats.getProcessingNanos();
+ }
+
+ for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+ nanos += calculateProcessingNanos(childGroupStatus);
+ }
+
+ return nanos;
+ }
+
+
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000000..23be9d75c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.reporting.datadog.DataDogReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html
new file mode 100644
index 0000000000..43bac38d01
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html
@@ -0,0 +1,56 @@
+
+
+
+
+
+ DataDogReportingTask
+
+
+
+
+
DataDogReportingTask
+
+
This ReportingTask sends the following metrics to DataDog:
+
+
FlowFilesReceivedLast5Minutes
+
BytesReceivedLast5Minutes
+
FlowFilesSentLast5Minutes
+
BytesSentLast5Minutes
+
FlowFilesQueued
+
BytesQueued
+
BytesReadLast5Minutes
+
BytesWrittenLast5Minutes
+
ActiveThreads
+
TotalTaskDurationSeconds
+
jvm.uptime
+
jvm.heap_used
+
jvm.heap_usage
+
jvm.non_heap_usage
+
jvm.thread_states.runnable
+
jvm.thread_states.blocked
+
jvm.thread_states.timed_waiting
+
jvm.thread_states.terminated
+
jvm.thread_count
+
jvm.daemon_thread_count
+
jvm.file_descriptor_usage
+
jvm.gc.runs
+
jvm.gc.time
+
+
+ Please consult the DataDog and NiFi documentation for further details.
+
+
+
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
new file mode 100644
index 0000000000..b785d40c19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.datadog.metrics.MetricsService;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.verify;
+
+
+public class TestDataDogReportingTask {
+
+ private ProcessGroupStatus status;
+ private ProcessorStatus procStatus;
+ private ConcurrentHashMap metricsMap;
+ private MetricRegistry metricRegistry;
+ private MetricsService metricsService;
+ private String env = "dev";
+ private String prefix = "nifi";
+ private ReportingContext context;
+ private ReportingInitializationContext initContext;
+ private ConfigurationContext configurationContext;
+ private volatile VirtualMachineMetrics virtualMachineMetrics;
+ private Logger logger;
+
+ @Before
+ public void setup() {
+ initProcessGroupStatus();
+ initProcessorStatuses();
+ initContexts();
+ }
+
+ //init all contexts
+ private void initContexts() {
+ configurationContext = Mockito.mock(ConfigurationContext.class);
+ context = Mockito.mock(ReportingContext.class);
+ Mockito.when(context.getProperty(DataDogReportingTask.ENVIRONMENT))
+ .thenReturn(new MockPropertyValue(env, null));
+ Mockito.when(context.getProperty(DataDogReportingTask.METRICS_PREFIX))
+ .thenReturn(new MockPropertyValue(prefix, null));
+ Mockito.when(context.getProperty(DataDogReportingTask.API_KEY))
+ .thenReturn(new MockPropertyValue("agent", null));
+ Mockito.when(context.getProperty(DataDogReportingTask.DATADOG_TRANSPORT))
+ .thenReturn(new MockPropertyValue("DataDog Agent", null));
+ EventAccess eventAccess = Mockito.mock(EventAccess.class);
+ Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+ Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+
+ logger = Mockito.mock(Logger.class);
+ initContext = Mockito.mock(ReportingInitializationContext.class);
+ Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ //Mockito.when(initContext.getLogger()).thenReturn(logger);
+ metricsMap = new ConcurrentHashMap<>();
+ metricRegistry = Mockito.mock(MetricRegistry.class);
+ virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+ metricsService = Mockito.mock(MetricsService.class);
+
+ }
+
+ //test onTrigger method
+ @Test
+ public void testOnTrigger() throws InitializationException, IOException {
+ DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
+ dataDogReportingTask.initialize(initContext);
+ dataDogReportingTask.setup(configurationContext);
+ dataDogReportingTask.onTrigger(context);
+
+ verify(metricsService, atLeast(1)).getProcessorMetrics(Mockito.any());
+ verify(metricsService, atLeast(1)).getJVMMetrics(Mockito.any());
+ }
+
+
+ //test updating metrics of processors
+ @Test
+ public void testUpdateMetricsProcessor() throws InitializationException, IOException {
+ MetricsService ms = new MetricsService();
+ Map processorMetrics = ms.getProcessorMetrics(procStatus);
+ Map tagsMap = ImmutableMap.of("env", "test");
+ DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
+ dataDogReportingTask.initialize(initContext);
+ dataDogReportingTask.setup(configurationContext);
+ dataDogReportingTask.updateMetrics(processorMetrics, Optional.of("sampleProcessor"), tagsMap);
+
+ verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"), Mockito.any());
+ }
+
+ //test updating JMV metrics
+ @Test
+ public void testUpdateMetricsJVM() throws InitializationException, IOException {
+ MetricsService ms = new MetricsService();
+ Map processorMetrics = ms.getJVMMetrics(virtualMachineMetrics);
+ Map tagsMap = ImmutableMap.of("env", "test");
+
+ DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
+ dataDogReportingTask.initialize(initContext);
+ dataDogReportingTask.setup(configurationContext);
+
+ dataDogReportingTask.updateMetrics(processorMetrics, Optional.absent(), tagsMap);
+ verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), Mockito.any());
+ verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), Mockito.any());
+ }
+
+
+ private void initProcessGroupStatus() {
+ status = new ProcessGroupStatus();
+ status.setId("1234");
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+ status.setInputCount(2);
+ status.setOutputCount(4);
+ }
+
+ private void initProcessorStatuses() {
+ procStatus = new ProcessorStatus();
+ procStatus.setProcessingNanos(123456789);
+ procStatus.setInputCount(2);
+ procStatus.setOutputCount(4);
+ procStatus.setActiveThreadCount(6);
+ procStatus.setBytesSent(1256);
+ procStatus.setName("sampleProcessor");
+ Collection processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+ groupStatus.setProcessorStatus(processorStatuses);
+
+ Collection groupStatuses = new ArrayList<>();
+ groupStatuses.add(groupStatus);
+ status.setProcessGroupStatus(groupStatuses);
+ }
+
+ private class TestableDataDogReportingTask extends DataDogReportingTask {
+ @Override
+ protected MetricsService getMetricsService() {
+ return metricsService;
+ }
+
+ @Override
+ protected DDMetricRegistryBuilder getMetricRegistryBuilder() {
+ return new DDMetricRegistryBuilder();
+ }
+
+ @Override
+ protected MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
+
+ @Override
+ protected ConcurrentHashMap getMetricsMap() {
+ return metricsMap;
+ }
+
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
new file mode 100644
index 0000000000..f50cbb171c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.datadog.metrics.MetricNames;
+import org.apache.nifi.reporting.datadog.metrics.MetricsService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestMetricsService {
+
+ private ProcessGroupStatus status;
+ private MetricsService metricsService;
+
+ @Before
+ public void init() {
+ status = new ProcessGroupStatus();
+ metricsService = new MetricsService();
+ status.setId("1234");
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+ }
+
+ //test group status metric retreiveing
+ @Test
+ public void testGetProcessGroupStatusMetrics() {
+ ProcessorStatus procStatus = new ProcessorStatus();
+ List processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ final Map metrics = metricsService.getDataFlowMetrics(status);
+
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
+ Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
+ }
+
+ //test processor status metric retreiveing
+ @Test
+ public void testGetProcessorGroupStatusMetrics() {
+ ProcessorStatus procStatus = new ProcessorStatus();
+ List processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ final Map metrics = metricsService.getProcessorMetrics(procStatus);
+
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
+ }
+
+ //test JVM status metric retreiveing
+ @Test
+ public void testGetVirtualMachineMetrics() {
+ final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+
+ final Map metrics = metricsService.getJVMMetrics(virtualMachineMetrics);
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/pom.xml b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml
new file mode 100644
index 0000000000..eb6b87c635
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 1.0.0-SNAPSHOT
+
+
+ nifi-datadog-bundle
+ pom
+
+
+ nifi-datadog-reporting-task
+ nifi-datadog-nar
+
+
+
+
+
+ org.glassfish.jersey.core
+ jersey-client
+ 2.19
+
+
+
+
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
old mode 100644
new mode 100755
index 6ebd2daca8..f785c87e71
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -66,6 +66,7 @@
nifi-evtx-bundlenifi-slack-bundlenifi-snmp-bundle
+ nifi-datadog-bundlenifi-windows-event-log-bundlenifi-ignite-bundlenifi-email-bundle