diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index f972952af1..4efa96f2bb 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -710,7 +710,9 @@ Common Development and Distribution License 1.1
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.19 - https://jersey.java.net/jersey-spring/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.19 - https://jersey.java.net/jersey-servlet/)
@@ -729,6 +731,18 @@ The following binary components are provided under the Common Development and Di
(CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages(TM) Standard Tag Library API (javax.servlet.jsp.jstl:javax.servlet.jsp.jstl-api:jar:1.2.1 - http://jcp.org/en/jsr/detail?id=52)
(CDDL 1.1) (GPL2 w/ CPE) Java Servlet API (javax.servlet:javax.servlet-api:jar:3.1.0 - http://servlet-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
+ (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
+ (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
+
************************
Common Development and Distribution License 1.0
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 60a6545f33..1f1522787d 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -212,6 +212,11 @@ language governing permissions and limitations under the License. -->
nifi-aws-narnar
+
+ org.apache.nifi
+ nifi-ambari-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml
new file mode 100644
index 0000000000..ec31d4f672
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml
@@ -0,0 +1,33 @@
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-ambari-bundle
+ 0.3.0-SNAPSHOT
+
+
+ nifi-ambari-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-ambari-reporting-task
+ 0.3.0-SNAPSHOT
+
+
+
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..03ccfd8384
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,46 @@
+nifi-ambari-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Yammer Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2012 Coda Hale and Yammer, Inc.
+
+ This product includes software developed by Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
+ with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
+ (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
+ (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
+ (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
+ (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
+ (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)
+
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
new file mode 100644
index 0000000000..ac5a9370d1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-ambari-bundle
+ 0.3.0-SNAPSHOT
+
+
+ nifi-ambari-reporting-task
+ Publishes NiFi metrics to Ambari
+
+
+
+ org.glassfish.jersey.core
+ jersey-client
+
+
+ org.glassfish
+ javax.json
+ 1.0.4
+
+
+ javax.json
+ javax.json-api
+ 1.0
+
+
+ com.yammer.metrics
+ metrics-core
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
new file mode 100644
index 0000000000..cff0b48924
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
+import org.apache.nifi.reporting.ambari.metrics.MetricsService;
+
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"reporting", "ambari", "metrics"})
+@CapabilityDescription("Publishes metrics from NiFi to Ambari")
+public class AmbariReportingTask extends AbstractReportingTask {
+
+ static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
+ .name("Metrics Collector URL")
+ .description("The URL of the Ambari Metrics Collector Service")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("http://localhost:6188/ws/v1/timeline/metrics")
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
+ .name("Application ID")
+ .description("The Application ID to be included in the metrics sent to Ambari")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("${hostname(true)}")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private volatile Client client;
+ private volatile JsonBuilderFactory factory;
+ private volatile VirtualMachineMetrics virtualMachineMetrics;
+
+ private final MetricsService metricsService = new MetricsService();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(METRICS_COLLECTOR_URL);
+ properties.add(APPLICATION_ID);
+ properties.add(HOSTNAME);
+ return properties;
+ }
+
+ @OnScheduled
+ public void setup(final ConfigurationContext context) throws IOException {
+ final Map config = Collections.emptyMap();
+ factory = Json.createBuilderFactory(config);
+ client = createClient();
+ virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+ }
+
+ // used for testing to allow tests to override the client
+ protected Client createClient() {
+ return ClientBuilder.newClient();
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+ final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
+
+ final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
+ .evaluateAttributeExpressions().getValue();
+ final String applicationId = context.getProperty(APPLICATION_ID)
+ .evaluateAttributeExpressions().getValue();
+ final String hostname = context.getProperty(HOSTNAME)
+ .evaluateAttributeExpressions().getValue();
+
+ final long start = System.currentTimeMillis();
+
+ final Map statusMetrics = metricsService.getMetrics(status);
+ final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
+
+ final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+
+ final JsonObject metricsObject = metricsBuilder
+ .applicationId(applicationId)
+ .instanceId(status.getId())
+ .hostname(hostname)
+ .timestamp(start)
+ .addAllMetrics(statusMetrics)
+ .addAllMetrics(jvmMetrics)
+ .build();
+
+ final WebTarget metricsTarget = client.target(metricsCollectorUrl);
+ final Invocation.Builder invocation = metricsTarget.request();
+
+ final Entity entity = Entity.json(metricsObject.toString());
+ getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
+
+ final Response response = invocation.post(entity);
+ if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+ final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
+ getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis});
+ } else {
+ final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
+ getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
new file mode 100644
index 0000000000..8e234ce841
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+ private final JsonBuilderFactory factory;
+
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private String timestamp;
+ private String metricName;
+ private String metricValue;
+
+ public MetricBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricBuilder timestamp(final long timestamp) {
+ this.timestamp = String.valueOf(timestamp);
+ return this;
+ }
+
+ public MetricBuilder metricName(final String metricName) {
+ this.metricName = metricName;
+ return this;
+ }
+
+ public MetricBuilder metricValue(final String metricValue) {
+ this.metricValue = metricValue;
+ return this;
+ }
+
+ public JsonObject build() {
+ return factory.createObjectBuilder()
+ .add(MetricFields.METRIC_NAME, metricName)
+ .add(MetricFields.APP_ID, applicationId)
+ .add(MetricFields.INSTANCE_ID, instanceId)
+ .add(MetricFields.HOSTNAME, hostname)
+ .add(MetricFields.TIMESTAMP, timestamp)
+ .add(MetricFields.START_TIME, timestamp)
+ .add(MetricFields.METRICS,
+ factory.createObjectBuilder()
+ .add(String.valueOf(timestamp), metricValue)
+ ).build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
new file mode 100644
index 0000000000..1c1629c511
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.api;
+
+public interface MetricFields {
+
+ String METRIC_NAME = "metricname";
+ String APP_ID = "appid";
+ String INSTANCE_ID = "instanceid";
+ String HOSTNAME = "hostname";
+ String TIMESTAMP = "timestamp";
+ String START_TIME = "starttime";
+ String METRICS = "metrics";
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
new file mode 100644
index 0000000000..11b4db5b21
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+ static final String ROOT_JSON_ELEMENT = "metrics";
+
+ private final JsonBuilderFactory factory;
+
+ private long timestamp;
+ private String applicationId;
+ private String instanceId;
+ private String hostname;
+ private Map metrics = new HashMap<>();
+
+ public MetricsBuilder(final JsonBuilderFactory factory) {
+ this.factory = factory;
+ }
+
+ public MetricsBuilder applicationId(final String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public MetricsBuilder instanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ return this;
+ }
+
+ public MetricsBuilder hostname(final String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MetricsBuilder timestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public MetricsBuilder metric(final String name, String value) {
+ this.metrics.put(name, value);
+ return this;
+ }
+
+ public MetricsBuilder addAllMetrics(final Map metrics) {
+ this.metrics.putAll(metrics);
+ return this;
+ }
+
+ public JsonObject build() {
+ // builds JsonObject for individual metrics
+ final MetricBuilder metricBuilder = new MetricBuilder(factory);
+ metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+ final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
+
+ for (Map.Entry entry : metrics.entrySet()) {
+ metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+ metricArrayBuilder.add(metricBuilder.build());
+ }
+
+ // add the array of metrics to a top-level json object
+ final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+ metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+ return metricsBuilder.build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
new file mode 100644
index 0000000000..f4e89cea14
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+ // NiFi Metrics
+ String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+ String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+ String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+ String BYTES_SENT = "BytesSentLast5Minutes";
+ String FLOW_FILES_QUEUED = "FlowFilesQueued";
+ String BYTES_QUEUED = "BytesQueued";
+ String BYTES_READ = "BytesReadLast5Minutes";
+ String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+ String ACTIVE_THREADS = "ActiveThreads";
+ String TOTAL_TASK_DURATION = "TotalTaskDurationSeconds";
+
+ // JVM Metrics
+ String JVM_UPTIME = "jvm.uptime";
+ String JVM_HEAP_USED = "jvm.heap_used";
+ String JVM_HEAP_USAGE = "jvm.heap_usage";
+ String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+ String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+ String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+ String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+ String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+ String JVM_THREAD_COUNT = "jvm.thread_count";
+ String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+ String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+ String JVM_GC_RUNS = "jvm.gc.runs";
+ String JVM_GC_TIME = "jvm.gc.time";
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
new file mode 100644
index 0000000000..8b1105156f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.metrics;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+ /**
+ * Generates a Map of metrics for a ProcessGroupStatus instance.
+ *
+ * @param status a ProcessGroupStatus to get metrics from
+ * @return a map of metrics for the given status
+ */
+ public Map getMetrics(ProcessGroupStatus status) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived()));
+ metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived()));
+ metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent()));
+ metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent()));
+ metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount()));
+ metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize()));
+ metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead()));
+ metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten()));
+ metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount()));
+ metrics.put(MetricNames.TOTAL_TASK_DURATION, String.valueOf(calculateProcessingNanos(status)));
+ return metrics;
+ }
+
+ /**
+ * Generates a Map of metrics for VirtualMachineMetrics.
+ *
+ * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
+ * @return a map of metrics from the given VirtualMachineStatus
+ */
+ public Map getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+ final Map metrics = new HashMap<>();
+ metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
+ metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
+ metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
+ metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
+ metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
+ metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
+ metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
+
+ for (Map.Entry entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
+ final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
+ switch(entry.getKey()) {
+ case BLOCKED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
+ break;
+ case RUNNABLE:
+ metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
+ break;
+ case TERMINATED:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
+ break;
+ case TIMED_WAITING:
+ metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
+ break;
+ default:
+ break;
+ }
+ }
+
+ for (Map.Entry entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+ final String gcName = entry.getKey().replace(" ", "");
+ final long runs = entry.getValue().getRuns();
+ final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
+ metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
+ metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
+ }
+
+ return metrics;
+ }
+
+ // calculates the total processing time of all processors in nanos
+ protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+ long nanos = 0L;
+
+ for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+ nanos += procStats.getProcessingNanos();
+ }
+
+ for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+ nanos += calculateProcessingNanos(childGroupStatus);
+ }
+
+ return nanos;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000000..274710acdd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.reporting.ambari.AmbariReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html
new file mode 100644
index 0000000000..707de6ef25
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html
@@ -0,0 +1,57 @@
+
+
+
+
+
+ AmbariReportingTask
+
+
+
+
+
AmbariReportingTask
+
+
This ReportingTask sends the following metrics to Ambari:
+
+
FlowFilesReceivedLast5Minutes
+
BytesReceivedLast5Minutes
+
FlowFilesSentLast5Minutes
+
BytesSentLast5Minutes
+
FlowFilesQueued
+
BytesQueued
+
BytesReadLast5Minutes
+
BytesWrittenLast5Minutes
+
ActiveThreads
+
TotalTaskDurationSeconds
+
jvm.uptime
+
jvm.heap_used
+
jvm.heap_usage
+
jvm.non_heap_usage
+
jvm.thread_states.runnable
+
jvm.thread_states.blocked
+
jvm.thread_states.timed_waiting
+
jvm.thread_states.terminated
+
jvm.thread_count
+
jvm.daemon_thread_count
+
jvm.file_descriptor_usage
+
jvm.gc.runs
+
jvm.gc.time
+
+
+ In order to make use of these metrics in Ambari, a NIFI service must be created and installed
+ in Ambari. Please consult the Ambari and NiFi documentation for further details.
+
+
+
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
new file mode 100644
index 0000000000..ce5f8a64cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+
+public class TestAmbariReportingTask {
+
+ private ProcessGroupStatus status;
+
+ @Before
+ public void setup() {
+ status = new ProcessGroupStatus();
+ status.setId("1234");
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+
+ // create a processor status with processing time
+ ProcessorStatus procStatus = new ProcessorStatus();
+ procStatus.setProcessingNanos(123456789);
+
+ Collection processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ // create a group status with processing time
+ ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+ groupStatus.setProcessorStatus(processorStatuses);
+
+ Collection groupStatuses = new ArrayList<>();
+ groupStatuses.add(groupStatus);
+ status.setProcessGroupStatus(groupStatuses);
+ }
+
+ @Test
+ public void testOnTrigger() throws InitializationException, IOException {
+ final String metricsUrl = "http://myambari:6188/ws/v1/timeline/metrics";
+ final String applicationId = "NIFI";
+ final String hostName = "localhost";
+
+ // create the jersey client mocks for handling the post
+ final Client client = Mockito.mock(Client.class);
+ final WebTarget target = Mockito.mock(WebTarget.class);
+ final Invocation.Builder builder = Mockito.mock(Invocation.Builder.class);
+
+ final Response response = Mockito.mock(Response.class);
+ Mockito.when(response.getStatus()).thenReturn(200);
+
+ Mockito.when(client.target(metricsUrl)).thenReturn(target);
+ Mockito.when(target.request()).thenReturn(builder);
+ Mockito.when(builder.post(Matchers.any(Entity.class))).thenReturn(response);
+
+ // mock the ReportingInitializationContext for initialize(...)
+ final ComponentLog logger = Mockito.mock(ComponentLog.class);
+ final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+ Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+ // mock the ConfigurationContext for setup(...)
+ final ConfigurationContext configurationContext = Mockito.mock(ConfigurationContext.class);
+
+ // mock the ReportingContext for onTrigger(...)
+ final ReportingContext context = Mockito.mock(ReportingContext.class);
+ Mockito.when(context.getProperty(AmbariReportingTask.METRICS_COLLECTOR_URL))
+ .thenReturn(new MockPropertyValue(metricsUrl, null));
+ Mockito.when(context.getProperty(AmbariReportingTask.APPLICATION_ID))
+ .thenReturn(new MockPropertyValue(applicationId, null));
+ Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
+ .thenReturn(new MockPropertyValue(hostName, null));
+
+ final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+ Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+ Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+
+ // create a testable instance of the reporting task
+ final AmbariReportingTask task = new TestableAmbariReportingTask(client);
+ task.initialize(initContext);
+ task.setup(configurationContext);
+ task.onTrigger(context);
+ }
+
+ // override the creation of the client to provide a mock
+ private class TestableAmbariReportingTask extends AmbariReportingTask {
+
+ private Client testClient;
+
+ public TestableAmbariReportingTask(Client client) {
+ this.testClient = client;
+ }
+
+ @Override
+ protected Client createClient() {
+ return testClient;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
new file mode 100644
index 0000000000..cdaa453435
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.api;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestMetricsBuilder {
+
+ @Test
+ public void testBuildMetricsObject() {
+ final Map config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+
+ final String instanceId = "1234-5678-1234-5678";
+ final String applicationId = "NIFI";
+ final String hostname = "localhost";
+ final long timestamp = System.currentTimeMillis();
+
+ final Map metrics = new HashMap<>();
+ metrics.put("a", "1");
+ metrics.put("b", "2");
+
+ final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+ final JsonObject metricsObject = metricsBuilder
+ .applicationId(applicationId)
+ .instanceId(instanceId)
+ .hostname(hostname)
+ .timestamp(timestamp)
+ .addAllMetrics(metrics)
+ .build();
+
+ final JsonArray metricsArray = metricsObject.getJsonArray("metrics");
+ Assert.assertNotNull(metricsArray);
+ Assert.assertEquals(2, metricsArray.size());
+
+ JsonObject firstMetric = metricsArray.getJsonObject(0);
+ if (!"a".equals(firstMetric.getString(MetricFields.METRIC_NAME))) {
+ firstMetric = metricsArray.getJsonObject(1);
+ }
+
+ Assert.assertEquals("a", firstMetric.getString(MetricFields.METRIC_NAME));
+ Assert.assertEquals(applicationId, firstMetric.getString(MetricFields.APP_ID));
+ Assert.assertEquals(instanceId, firstMetric.getString(MetricFields.INSTANCE_ID));
+ Assert.assertEquals(hostname, firstMetric.getString(MetricFields.HOSTNAME));
+ Assert.assertEquals(String.valueOf(timestamp), firstMetric.getString(MetricFields.TIMESTAMP));
+
+ final JsonObject firstMetricValues = firstMetric.getJsonObject("metrics");
+ Assert.assertEquals("1", firstMetricValues.getString("" + timestamp));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
new file mode 100644
index 0000000000..3f6be06566
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.ambari.metrics;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+public class TestMetricsService {
+
+ @Test
+ public void testGetProcessGroupStatusMetrics() {
+ ProcessGroupStatus status = new ProcessGroupStatus();
+ status.setId("1234");
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+
+ // create a processor status with processing time
+ ProcessorStatus procStatus = new ProcessorStatus();
+ procStatus.setProcessingNanos(123456789);
+
+ Collection processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ // create a group status with processing time
+ ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+ groupStatus.setProcessorStatus(processorStatuses);
+
+ Collection groupStatuses = new ArrayList<>();
+ groupStatuses.add(groupStatus);
+ status.setProcessGroupStatus(groupStatuses);
+
+ final MetricsService service = new MetricsService();
+
+ final Map metrics = service.getMetrics(status);
+
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
+ Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
+ Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
+ Assert.assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION));
+ }
+
+ @Test
+ public void testGetVirtualMachineMetrics() {
+ final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+ final MetricsService service = new MetricsService();
+
+ final Map metrics = service.getMetrics(virtualMachineMetrics);
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
+ Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/pom.xml
new file mode 100644
index 0000000000..838ca89f5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-ambari-bundle/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 0.3.0-SNAPSHOT
+
+
+ nifi-ambari-bundle
+ pom
+
+
+ nifi-ambari-reporting-task
+ nifi-ambari-nar
+
+
+
+
+
+ org.glassfish.jersey.core
+ jersey-client
+ 2.19
+
+
+
+
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index c82b5bd8fb..fdcececcaa 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -42,6 +42,7 @@
nifi-language-translation-bundlenifi-mongodb-bundlenifi-flume-bundle
+ nifi-ambari-bundle
diff --git a/pom.xml b/pom.xml
index 65aaa044e7..7cbc76cca7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
3.2.7.RELEASE1.192.6.0
+ 2.2.0
@@ -501,7 +502,12 @@
com.yammer.metricsmetrics-ganglia
- 2.2.0
+ ${yammer.metrics.version}
+
+
+ com.yammer.metrics
+ metrics-core
+ ${yammer.metrics.version}javax.jms
@@ -877,6 +883,12 @@
0.3.0-SNAPSHOTnar
+
+ org.apache.nifi
+ nifi-ambari-nar
+ 0.3.0-SNAPSHOT
+ nar
+ org.apache.nifinifi-properties