From 1c34478eb8304c81919e2929ca6d0889fe93006b Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 17 Aug 2015 08:53:46 -0400 Subject: [PATCH] NIFI-790 Create reporting task to deliver metrics to Apache Ambari - Adding NOTICE for nifi-ambari-nar, and fixing formatting issues in nifi-ambari-reporting-task pom - Addressing review feedback: updating assembly NOTICE, fixing unit test, and minor clean up - Adding additionalDetails.html --- nifi-assembly/NOTICE | 14 ++ nifi-assembly/pom.xml | 5 + .../nifi-ambari-nar/pom.xml | 33 ++++ .../src/main/resources/META-INF/NOTICE | 46 ++++++ .../nifi-ambari-reporting-task/pom.xml | 71 ++++++++ .../reporting/ambari/AmbariReportingTask.java | 151 ++++++++++++++++++ .../reporting/ambari/api/MetricBuilder.java | 84 ++++++++++ .../reporting/ambari/api/MetricFields.java | 29 ++++ .../reporting/ambari/api/MetricsBuilder.java | 93 +++++++++++ .../reporting/ambari/metrics/MetricNames.java | 51 ++++++ .../ambari/metrics/MetricsService.java | 115 +++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 15 ++ .../additionalDetails.html | 57 +++++++ .../ambari/TestAmbariReportingTask.java | 139 ++++++++++++++++ .../ambari/api/TestMetricsBuilder.java | 74 +++++++++ .../ambari/metrics/TestMetricsService.java | 96 +++++++++++ nifi-nar-bundles/nifi-ambari-bundle/pom.xml | 41 +++++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 14 +- 19 files changed, 1128 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java create mode 100644 nifi-nar-bundles/nifi-ambari-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index f972952af1..4efa96f2bb 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -710,7 +710,9 @@ Common Development and Distribution License 1.1 The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/) (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/) (CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/) (CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.19 - https://jersey.java.net/jersey-spring/) (CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.19 - https://jersey.java.net/jersey-servlet/) @@ -729,6 +731,18 @@ The following binary components are provided under the Common Development and Di (CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages(TM) Standard Tag Library API (javax.servlet.jsp.jstl:javax.servlet.jsp.jstl-api:jar:1.2.1 - http://jcp.org/en/jsr/detail?id=52) (CDDL 1.1) (GPL2 w/ CPE) Java Servlet API (javax.servlet:javax.servlet-api:jar:3.1.0 - http://servlet-spec.java.net) (CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) + (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator) + (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250) + (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api) + (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator) + (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils) + (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged) + (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject) + (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/) + ************************ Common Development and Distribution License 1.0 diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 60a6545f33..1f1522787d 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -212,6 +212,11 @@ language governing permissions and limitations under the License. --> nifi-aws-nar nar + + org.apache.nifi + nifi-ambari-nar + nar + diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml new file mode 100644 index 0000000000..ec31d4f672 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + org.apache.nifi + nifi-ambari-bundle + 0.3.0-SNAPSHOT + + + nifi-ambari-nar + nar + + + + org.apache.nifi + nifi-ambari-reporting-task + 0.3.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..03ccfd8384 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,46 @@ +nifi-ambari-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Yammer Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2012 Coda Hale and Yammer, Inc. + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released + with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) + (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator) + (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250) + (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api) + (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator) + (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils) + (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged) + (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject) + (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/) + diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml new file mode 100644 index 0000000000..ac5a9370d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-ambari-bundle + 0.3.0-SNAPSHOT + + + nifi-ambari-reporting-task + Publishes NiFi metrics to Ambari + + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish + javax.json + 1.0.4 + + + javax.json + javax.json-api + 1.0 + + + com.yammer.metrics + metrics-core + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-utils + + + + org.apache.nifi + nifi-mock + test + + + org.mockito + mockito-all + test + + + diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java new file mode 100644 index 0000000000..cff0b48924 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari; + +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ambari.api.MetricsBuilder; +import org.apache.nifi.reporting.ambari.metrics.MetricsService; + + +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Tags({"reporting", "ambari", "metrics"}) +@CapabilityDescription("Publishes metrics from NiFi to Ambari") +public class AmbariReportingTask extends AbstractReportingTask { + + static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder() + .name("Metrics Collector URL") + .description("The URL of the Ambari Metrics Collector Service") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("http://localhost:6188/ws/v1/timeline/metrics") + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() + .name("Application ID") + .description("The Application ID to be included in the metrics sent to Ambari") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${hostname(true)}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private volatile Client client; + private volatile JsonBuilderFactory factory; + private volatile VirtualMachineMetrics virtualMachineMetrics; + + private final MetricsService metricsService = new MetricsService(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(METRICS_COLLECTOR_URL); + properties.add(APPLICATION_ID); + properties.add(HOSTNAME); + return properties; + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + final Map config = Collections.emptyMap(); + factory = Json.createBuilderFactory(config); + client = createClient(); + virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + } + + // used for testing to allow tests to override the client + protected Client createClient() { + return ClientBuilder.newClient(); + } + + @Override + public void onTrigger(final ReportingContext context) { + final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); + + final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL) + .evaluateAttributeExpressions().getValue(); + final String applicationId = context.getProperty(APPLICATION_ID) + .evaluateAttributeExpressions().getValue(); + final String hostname = context.getProperty(HOSTNAME) + .evaluateAttributeExpressions().getValue(); + + final long start = System.currentTimeMillis(); + + final Map statusMetrics = metricsService.getMetrics(status); + final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); + + final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); + + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(status.getId()) + .hostname(hostname) + .timestamp(start) + .addAllMetrics(statusMetrics) + .addAllMetrics(jvmMetrics) + .build(); + + final WebTarget metricsTarget = client.target(metricsCollectorUrl); + final Invocation.Builder invocation = metricsTarget.request(); + + final Entity entity = Entity.json(metricsObject.toString()); + getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()}); + + final Response response = invocation.post(entity); + if (response.getStatus() == Response.Status.OK.getStatusCode()) { + final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start); + getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis}); + } else { + final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error"; + getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity}); + } + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java new file mode 100644 index 0000000000..8e234ce841 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.api; + +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +/** + * Builds the JsonObject for an individual metric. + */ +public class MetricBuilder { + + private final JsonBuilderFactory factory; + + private String applicationId; + private String instanceId; + private String hostname; + private String timestamp; + private String metricName; + private String metricValue; + + public MetricBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricBuilder timestamp(final long timestamp) { + this.timestamp = String.valueOf(timestamp); + return this; + } + + public MetricBuilder metricName(final String metricName) { + this.metricName = metricName; + return this; + } + + public MetricBuilder metricValue(final String metricValue) { + this.metricValue = metricValue; + return this; + } + + public JsonObject build() { + return factory.createObjectBuilder() + .add(MetricFields.METRIC_NAME, metricName) + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.INSTANCE_ID, instanceId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.TIMESTAMP, timestamp) + .add(MetricFields.START_TIME, timestamp) + .add(MetricFields.METRICS, + factory.createObjectBuilder() + .add(String.valueOf(timestamp), metricValue) + ).build(); + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java new file mode 100644 index 0000000000..1c1629c511 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.api; + +public interface MetricFields { + + String METRIC_NAME = "metricname"; + String APP_ID = "appid"; + String INSTANCE_ID = "instanceid"; + String HOSTNAME = "hostname"; + String TIMESTAMP = "timestamp"; + String START_TIME = "starttime"; + String METRICS = "metrics"; + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java new file mode 100644 index 0000000000..11b4db5b21 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.api; + +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.util.HashMap; +import java.util.Map; + +/** + * Builds the overall JsonObject for the Metrics. + */ +public class MetricsBuilder { + + static final String ROOT_JSON_ELEMENT = "metrics"; + + private final JsonBuilderFactory factory; + + private long timestamp; + private String applicationId; + private String instanceId; + private String hostname; + private Map metrics = new HashMap<>(); + + public MetricsBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricsBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricsBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricsBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricsBuilder timestamp(final long timestamp) { + this.timestamp = timestamp; + return this; + } + + public MetricsBuilder metric(final String name, String value) { + this.metrics.put(name, value); + return this; + } + + public MetricsBuilder addAllMetrics(final Map metrics) { + this.metrics.putAll(metrics); + return this; + } + + public JsonObject build() { + // builds JsonObject for individual metrics + final MetricBuilder metricBuilder = new MetricBuilder(factory); + metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); + + final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder(); + + for (Map.Entry entry : metrics.entrySet()) { + metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); + metricArrayBuilder.add(metricBuilder.build()); + } + + // add the array of metrics to a top-level json object + final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder(); + metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder); + return metricsBuilder.build(); + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java new file mode 100644 index 0000000000..f4e89cea14 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.metrics; + +/** + * The Metric names to send to Ambari. + */ +public interface MetricNames { + + // NiFi Metrics + String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; + String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; + String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; + String BYTES_SENT = "BytesSentLast5Minutes"; + String FLOW_FILES_QUEUED = "FlowFilesQueued"; + String BYTES_QUEUED = "BytesQueued"; + String BYTES_READ = "BytesReadLast5Minutes"; + String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; + String ACTIVE_THREADS = "ActiveThreads"; + String TOTAL_TASK_DURATION = "TotalTaskDurationSeconds"; + + // JVM Metrics + String JVM_UPTIME = "jvm.uptime"; + String JVM_HEAP_USED = "jvm.heap_used"; + String JVM_HEAP_USAGE = "jvm.heap_usage"; + String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage"; + String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable"; + String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked"; + String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting"; + String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated"; + String JVM_THREAD_COUNT = "jvm.thread_count"; + String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count"; + String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage"; + String JVM_GC_RUNS = "jvm.gc.runs"; + String JVM_GC_TIME = "jvm.gc.time"; + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java new file mode 100644 index 0000000000..8b1105156f --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.metrics; + +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * A service used to produce key/value metrics based on a given input. + */ +public class MetricsService { + + /** + * Generates a Map of metrics for a ProcessGroupStatus instance. + * + * @param status a ProcessGroupStatus to get metrics from + * @return a map of metrics for the given status + */ + public Map getMetrics(ProcessGroupStatus status) { + final Map metrics = new HashMap<>(); + metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived())); + metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived())); + metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent())); + metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent())); + metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount())); + metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize())); + metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead())); + metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten())); + metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount())); + metrics.put(MetricNames.TOTAL_TASK_DURATION, String.valueOf(calculateProcessingNanos(status))); + return metrics; + } + + /** + * Generates a Map of metrics for VirtualMachineMetrics. + * + * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from + * @return a map of metrics from the given VirtualMachineStatus + */ + public Map getMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime())); + metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed())); + metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage())); + metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage())); + metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount())); + metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount())); + metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage())); + + for (Map.Entry entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { + final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); + switch(entry.getKey()) { + case BLOCKED: + metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue)); + break; + case RUNNABLE: + metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue)); + break; + case TERMINATED: + metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue)); + break; + case TIMED_WAITING: + metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue)); + break; + default: + break; + } + } + + for (Map.Entry entry : virtualMachineMetrics.garbageCollectors().entrySet()) { + final String gcName = entry.getKey().replace(" ", ""); + final long runs = entry.getValue().getRuns(); + final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); + metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs)); + metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS)); + } + + return metrics; + } + + // calculates the total processing time of all processors in nanos + protected long calculateProcessingNanos(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStats : status.getProcessorStatus()) { + nanos += procStats.getProcessingNanos(); + } + + for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + nanos += calculateProcessingNanos(childGroupStatus); + } + + return nanos; + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 0000000000..274710acdd --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.reporting.ambari.AmbariReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html new file mode 100644 index 0000000000..707de6ef25 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html @@ -0,0 +1,57 @@ + + + + + + AmbariReportingTask + + + + +

AmbariReportingTask

+ +

This ReportingTask sends the following metrics to Ambari:

+
    +
  • FlowFilesReceivedLast5Minutes
  • +
  • BytesReceivedLast5Minutes
  • +
  • FlowFilesSentLast5Minutes
  • +
  • BytesSentLast5Minutes
  • +
  • FlowFilesQueued
  • +
  • BytesQueued
  • +
  • BytesReadLast5Minutes
  • +
  • BytesWrittenLast5Minutes
  • +
  • ActiveThreads
  • +
  • TotalTaskDurationSeconds
  • +
  • jvm.uptime
  • +
  • jvm.heap_used
  • +
  • jvm.heap_usage
  • +
  • jvm.non_heap_usage
  • +
  • jvm.thread_states.runnable
  • +
  • jvm.thread_states.blocked
  • +
  • jvm.thread_states.timed_waiting
  • +
  • jvm.thread_states.terminated
  • +
  • jvm.thread_count
  • +
  • jvm.daemon_thread_count
  • +
  • jvm.file_descriptor_usage
  • +
  • jvm.gc.runs
  • +
  • jvm.gc.time
  • +
+

+ In order to make use of these metrics in Ambari, a NIFI service must be created and installed + in Ambari. Please consult the Ambari and NiFi documentation for further details. +

+ + diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java new file mode 100644 index 0000000000..ce5f8a64cf --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari; + +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; + +public class TestAmbariReportingTask { + + private ProcessGroupStatus status; + + @Before + public void setup() { + status = new ProcessGroupStatus(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + + // create a processor status with processing time + ProcessorStatus procStatus = new ProcessorStatus(); + procStatus.setProcessingNanos(123456789); + + Collection processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + // create a group status with processing time + ProcessGroupStatus groupStatus = new ProcessGroupStatus(); + groupStatus.setProcessorStatus(processorStatuses); + + Collection groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus); + status.setProcessGroupStatus(groupStatuses); + } + + @Test + public void testOnTrigger() throws InitializationException, IOException { + final String metricsUrl = "http://myambari:6188/ws/v1/timeline/metrics"; + final String applicationId = "NIFI"; + final String hostName = "localhost"; + + // create the jersey client mocks for handling the post + final Client client = Mockito.mock(Client.class); + final WebTarget target = Mockito.mock(WebTarget.class); + final Invocation.Builder builder = Mockito.mock(Invocation.Builder.class); + + final Response response = Mockito.mock(Response.class); + Mockito.when(response.getStatus()).thenReturn(200); + + Mockito.when(client.target(metricsUrl)).thenReturn(target); + Mockito.when(target.request()).thenReturn(builder); + Mockito.when(builder.post(Matchers.any(Entity.class))).thenReturn(response); + + // mock the ReportingInitializationContext for initialize(...) + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + // mock the ConfigurationContext for setup(...) + final ConfigurationContext configurationContext = Mockito.mock(ConfigurationContext.class); + + // mock the ReportingContext for onTrigger(...) + final ReportingContext context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getProperty(AmbariReportingTask.METRICS_COLLECTOR_URL)) + .thenReturn(new MockPropertyValue(metricsUrl, null)); + Mockito.when(context.getProperty(AmbariReportingTask.APPLICATION_ID)) + .thenReturn(new MockPropertyValue(applicationId, null)); + Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME)) + .thenReturn(new MockPropertyValue(hostName, null)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); + + // create a testable instance of the reporting task + final AmbariReportingTask task = new TestableAmbariReportingTask(client); + task.initialize(initContext); + task.setup(configurationContext); + task.onTrigger(context); + } + + // override the creation of the client to provide a mock + private class TestableAmbariReportingTask extends AmbariReportingTask { + + private Client testClient; + + public TestableAmbariReportingTask(Client client) { + this.testClient = client; + } + + @Override + protected Client createClient() { + return testClient; + } + } +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java new file mode 100644 index 0000000000..cdaa453435 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.api; + +import org.junit.Assert; +import org.junit.Test; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TestMetricsBuilder { + + @Test + public void testBuildMetricsObject() { + final Map config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + + final String instanceId = "1234-5678-1234-5678"; + final String applicationId = "NIFI"; + final String hostname = "localhost"; + final long timestamp = System.currentTimeMillis(); + + final Map metrics = new HashMap<>(); + metrics.put("a", "1"); + metrics.put("b", "2"); + + final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(instanceId) + .hostname(hostname) + .timestamp(timestamp) + .addAllMetrics(metrics) + .build(); + + final JsonArray metricsArray = metricsObject.getJsonArray("metrics"); + Assert.assertNotNull(metricsArray); + Assert.assertEquals(2, metricsArray.size()); + + JsonObject firstMetric = metricsArray.getJsonObject(0); + if (!"a".equals(firstMetric.getString(MetricFields.METRIC_NAME))) { + firstMetric = metricsArray.getJsonObject(1); + } + + Assert.assertEquals("a", firstMetric.getString(MetricFields.METRIC_NAME)); + Assert.assertEquals(applicationId, firstMetric.getString(MetricFields.APP_ID)); + Assert.assertEquals(instanceId, firstMetric.getString(MetricFields.INSTANCE_ID)); + Assert.assertEquals(hostname, firstMetric.getString(MetricFields.HOSTNAME)); + Assert.assertEquals(String.valueOf(timestamp), firstMetric.getString(MetricFields.TIMESTAMP)); + + final JsonObject firstMetricValues = firstMetric.getJsonObject("metrics"); + Assert.assertEquals("1", firstMetricValues.getString("" + timestamp)); + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java new file mode 100644 index 0000000000..3f6be06566 --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.ambari.metrics; + +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +public class TestMetricsService { + + @Test + public void testGetProcessGroupStatusMetrics() { + ProcessGroupStatus status = new ProcessGroupStatus(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + + // create a processor status with processing time + ProcessorStatus procStatus = new ProcessorStatus(); + procStatus.setProcessingNanos(123456789); + + Collection processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + // create a group status with processing time + ProcessGroupStatus groupStatus = new ProcessGroupStatus(); + groupStatus.setProcessorStatus(processorStatuses); + + Collection groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus); + status.setProcessGroupStatus(groupStatuses); + + final MetricsService service = new MetricsService(); + + final Map metrics = service.getMetrics(status); + + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN)); + Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS)); + Assert.assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION)); + } + + @Test + public void testGetVirtualMachineMetrics() { + final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + final MetricsService service = new MetricsService(); + + final Map metrics = service.getMetrics(virtualMachineMetrics); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE)); + } + +} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/pom.xml new file mode 100644 index 0000000000..838ca89f5b --- /dev/null +++ b/nifi-nar-bundles/nifi-ambari-bundle/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 0.3.0-SNAPSHOT + + + nifi-ambari-bundle + pom + + + nifi-ambari-reporting-task + nifi-ambari-nar + + + + + + org.glassfish.jersey.core + jersey-client + 2.19 + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index c82b5bd8fb..fdcececcaa 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -42,6 +42,7 @@ nifi-language-translation-bundle nifi-mongodb-bundle nifi-flume-bundle + nifi-ambari-bundle diff --git a/pom.xml b/pom.xml index 65aaa044e7..7cbc76cca7 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ 3.2.7.RELEASE 1.19 2.6.0 + 2.2.0
@@ -501,7 +502,12 @@ com.yammer.metrics metrics-ganglia - 2.2.0 + ${yammer.metrics.version} + + + com.yammer.metrics + metrics-core + ${yammer.metrics.version} javax.jms @@ -877,6 +883,12 @@ 0.3.0-SNAPSHOT nar + + org.apache.nifi + nifi-ambari-nar + 0.3.0-SNAPSHOT + nar + org.apache.nifi nifi-properties