mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-790-NEW'
This commit is contained in:
commit
a264c49d80
|
@ -710,7 +710,9 @@ Common Development and Distribution License 1.1
|
|||
|
||||
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
|
||||
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-spring (com.sun.jersey:jersey-spring:jar:1.19 - https://jersey.java.net/jersey-spring/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-servlet (com.sun.jersey:jersey-servlet:jar:1.19 - https://jersey.java.net/jersey-servlet/)
|
||||
|
@ -729,6 +731,18 @@ The following binary components are provided under the Common Development and Di
|
|||
(CDDL 1.1) (GPL2 w/ CPE) JavaServer Pages(TM) Standard Tag Library API (javax.servlet.jsp.jstl:javax.servlet.jsp.jstl-api:jar:1.2.1 - http://jcp.org/en/jsr/detail?id=52)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) Java Servlet API (javax.servlet:javax.servlet-api:jar:3.1.0 - http://servlet-spec.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
|
||||
|
||||
|
||||
************************
|
||||
Common Development and Distribution License 1.0
|
||||
|
|
|
@ -212,6 +212,11 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-aws-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ambari-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ambari-bundle</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-ambari-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ambari-reporting-task</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,46 @@
|
|||
nifi-ambari-nar
|
||||
Copyright 2015 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
******************
|
||||
Apache Software License v2
|
||||
******************
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Yammer Metrics
|
||||
The following NOTICE information applies:
|
||||
Metrics
|
||||
Copyright 2010-2012 Coda Hale and Yammer, Inc.
|
||||
|
||||
This product includes software developed by Coda Hale and Yammer, Inc.
|
||||
|
||||
This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
|
||||
with the following comments:
|
||||
|
||||
Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
Expert Group and released to the public domain, as explained at
|
||||
http://creativecommons.org/publicdomain/zero/1.0/
|
||||
|
||||
************************
|
||||
Common Development and Distribution License 1.1
|
||||
************************
|
||||
|
||||
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
|
||||
|
||||
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
|
||||
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)
|
||||
|
|
@ -0,0 +1,71 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ambari-bundle</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-ambari-reporting-task</artifactId>
|
||||
<description>Publishes NiFi metrics to Ambari</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.core</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish</groupId>
|
||||
<artifactId>javax.json</artifactId>
|
||||
<version>1.0.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.json</groupId>
|
||||
<artifactId>javax.json-api</artifactId>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<!-- test dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari;
|
||||
|
||||
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.AbstractReportingTask;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
|
||||
import org.apache.nifi.reporting.ambari.metrics.MetricsService;
|
||||
|
||||
|
||||
import javax.json.Json;
|
||||
import javax.json.JsonBuilderFactory;
|
||||
import javax.json.JsonObject;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.client.Invocation;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tags({"reporting", "ambari", "metrics"})
|
||||
@CapabilityDescription("Publishes metrics from NiFi to Ambari")
|
||||
public class AmbariReportingTask extends AbstractReportingTask {
|
||||
|
||||
static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
|
||||
.name("Metrics Collector URL")
|
||||
.description("The URL of the Ambari Metrics Collector Service")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("http://localhost:6188/ws/v1/timeline/metrics")
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
|
||||
.name("Application ID")
|
||||
.description("The Application ID to be included in the metrics sent to Ambari")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("nifi")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("Hostname")
|
||||
.description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${hostname(true)}")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private volatile Client client;
|
||||
private volatile JsonBuilderFactory factory;
|
||||
private volatile VirtualMachineMetrics virtualMachineMetrics;
|
||||
|
||||
private final MetricsService metricsService = new MetricsService();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(METRICS_COLLECTOR_URL);
|
||||
properties.add(APPLICATION_ID);
|
||||
properties.add(HOSTNAME);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setup(final ConfigurationContext context) throws IOException {
|
||||
final Map<String, ?> config = Collections.emptyMap();
|
||||
factory = Json.createBuilderFactory(config);
|
||||
client = createClient();
|
||||
virtualMachineMetrics = VirtualMachineMetrics.getInstance();
|
||||
}
|
||||
|
||||
// used for testing to allow tests to override the client
|
||||
protected Client createClient() {
|
||||
return ClientBuilder.newClient();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ReportingContext context) {
|
||||
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
|
||||
|
||||
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
|
||||
.evaluateAttributeExpressions().getValue();
|
||||
final String applicationId = context.getProperty(APPLICATION_ID)
|
||||
.evaluateAttributeExpressions().getValue();
|
||||
final String hostname = context.getProperty(HOSTNAME)
|
||||
.evaluateAttributeExpressions().getValue();
|
||||
|
||||
final long start = System.currentTimeMillis();
|
||||
|
||||
final Map<String,String> statusMetrics = metricsService.getMetrics(status);
|
||||
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
|
||||
|
||||
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
|
||||
|
||||
final JsonObject metricsObject = metricsBuilder
|
||||
.applicationId(applicationId)
|
||||
.instanceId(status.getId())
|
||||
.hostname(hostname)
|
||||
.timestamp(start)
|
||||
.addAllMetrics(statusMetrics)
|
||||
.addAllMetrics(jvmMetrics)
|
||||
.build();
|
||||
|
||||
final WebTarget metricsTarget = client.target(metricsCollectorUrl);
|
||||
final Invocation.Builder invocation = metricsTarget.request();
|
||||
|
||||
final Entity<String> entity = Entity.json(metricsObject.toString());
|
||||
getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
|
||||
|
||||
final Response response = invocation.post(entity);
|
||||
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
|
||||
final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
|
||||
getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis});
|
||||
} else {
|
||||
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
|
||||
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.api;
|
||||
|
||||
import javax.json.JsonBuilderFactory;
|
||||
import javax.json.JsonObject;
|
||||
|
||||
/**
|
||||
* Builds the JsonObject for an individual metric.
|
||||
*/
|
||||
public class MetricBuilder {
|
||||
|
||||
private final JsonBuilderFactory factory;
|
||||
|
||||
private String applicationId;
|
||||
private String instanceId;
|
||||
private String hostname;
|
||||
private String timestamp;
|
||||
private String metricName;
|
||||
private String metricValue;
|
||||
|
||||
public MetricBuilder(final JsonBuilderFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public MetricBuilder applicationId(final String applicationId) {
|
||||
this.applicationId = applicationId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricBuilder instanceId(final String instanceId) {
|
||||
this.instanceId = instanceId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricBuilder hostname(final String hostname) {
|
||||
this.hostname = hostname;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricBuilder timestamp(final long timestamp) {
|
||||
this.timestamp = String.valueOf(timestamp);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricBuilder metricName(final String metricName) {
|
||||
this.metricName = metricName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricBuilder metricValue(final String metricValue) {
|
||||
this.metricValue = metricValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public JsonObject build() {
|
||||
return factory.createObjectBuilder()
|
||||
.add(MetricFields.METRIC_NAME, metricName)
|
||||
.add(MetricFields.APP_ID, applicationId)
|
||||
.add(MetricFields.INSTANCE_ID, instanceId)
|
||||
.add(MetricFields.HOSTNAME, hostname)
|
||||
.add(MetricFields.TIMESTAMP, timestamp)
|
||||
.add(MetricFields.START_TIME, timestamp)
|
||||
.add(MetricFields.METRICS,
|
||||
factory.createObjectBuilder()
|
||||
.add(String.valueOf(timestamp), metricValue)
|
||||
).build();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.api;
|
||||
|
||||
public interface MetricFields {
|
||||
|
||||
String METRIC_NAME = "metricname";
|
||||
String APP_ID = "appid";
|
||||
String INSTANCE_ID = "instanceid";
|
||||
String HOSTNAME = "hostname";
|
||||
String TIMESTAMP = "timestamp";
|
||||
String START_TIME = "starttime";
|
||||
String METRICS = "metrics";
|
||||
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.api;
|
||||
|
||||
import javax.json.JsonArrayBuilder;
|
||||
import javax.json.JsonBuilderFactory;
|
||||
import javax.json.JsonObject;
|
||||
import javax.json.JsonObjectBuilder;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Builds the overall JsonObject for the Metrics.
|
||||
*/
|
||||
public class MetricsBuilder {
|
||||
|
||||
static final String ROOT_JSON_ELEMENT = "metrics";
|
||||
|
||||
private final JsonBuilderFactory factory;
|
||||
|
||||
private long timestamp;
|
||||
private String applicationId;
|
||||
private String instanceId;
|
||||
private String hostname;
|
||||
private Map<String,String> metrics = new HashMap<>();
|
||||
|
||||
public MetricsBuilder(final JsonBuilderFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
public MetricsBuilder applicationId(final String applicationId) {
|
||||
this.applicationId = applicationId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricsBuilder instanceId(final String instanceId) {
|
||||
this.instanceId = instanceId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricsBuilder hostname(final String hostname) {
|
||||
this.hostname = hostname;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricsBuilder timestamp(final long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricsBuilder metric(final String name, String value) {
|
||||
this.metrics.put(name, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
|
||||
this.metrics.putAll(metrics);
|
||||
return this;
|
||||
}
|
||||
|
||||
public JsonObject build() {
|
||||
// builds JsonObject for individual metrics
|
||||
final MetricBuilder metricBuilder = new MetricBuilder(factory);
|
||||
metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
|
||||
|
||||
final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
|
||||
|
||||
for (Map.Entry<String,String> entry : metrics.entrySet()) {
|
||||
metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
|
||||
metricArrayBuilder.add(metricBuilder.build());
|
||||
}
|
||||
|
||||
// add the array of metrics to a top-level json object
|
||||
final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
|
||||
metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
|
||||
return metricsBuilder.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.metrics;
|
||||
|
||||
/**
|
||||
* The Metric names to send to Ambari.
|
||||
*/
|
||||
public interface MetricNames {
|
||||
|
||||
// NiFi Metrics
|
||||
String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
|
||||
String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
|
||||
String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
|
||||
String BYTES_SENT = "BytesSentLast5Minutes";
|
||||
String FLOW_FILES_QUEUED = "FlowFilesQueued";
|
||||
String BYTES_QUEUED = "BytesQueued";
|
||||
String BYTES_READ = "BytesReadLast5Minutes";
|
||||
String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
|
||||
String ACTIVE_THREADS = "ActiveThreads";
|
||||
String TOTAL_TASK_DURATION = "TotalTaskDurationSeconds";
|
||||
|
||||
// JVM Metrics
|
||||
String JVM_UPTIME = "jvm.uptime";
|
||||
String JVM_HEAP_USED = "jvm.heap_used";
|
||||
String JVM_HEAP_USAGE = "jvm.heap_usage";
|
||||
String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
|
||||
String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
|
||||
String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
|
||||
String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
|
||||
String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
|
||||
String JVM_THREAD_COUNT = "jvm.thread_count";
|
||||
String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
|
||||
String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
|
||||
String JVM_GC_RUNS = "jvm.gc.runs";
|
||||
String JVM_GC_TIME = "jvm.gc.time";
|
||||
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.metrics;
|
||||
|
||||
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A service used to produce key/value metrics based on a given input.
|
||||
*/
|
||||
public class MetricsService {
|
||||
|
||||
/**
|
||||
* Generates a Map of metrics for a ProcessGroupStatus instance.
|
||||
*
|
||||
* @param status a ProcessGroupStatus to get metrics from
|
||||
* @return a map of metrics for the given status
|
||||
*/
|
||||
public Map<String,String> getMetrics(ProcessGroupStatus status) {
|
||||
final Map<String,String> metrics = new HashMap<>();
|
||||
metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived()));
|
||||
metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived()));
|
||||
metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent()));
|
||||
metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent()));
|
||||
metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount()));
|
||||
metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize()));
|
||||
metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead()));
|
||||
metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten()));
|
||||
metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount()));
|
||||
metrics.put(MetricNames.TOTAL_TASK_DURATION, String.valueOf(calculateProcessingNanos(status)));
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a Map of metrics for VirtualMachineMetrics.
|
||||
*
|
||||
* @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
|
||||
* @return a map of metrics from the given VirtualMachineStatus
|
||||
*/
|
||||
public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
|
||||
final Map<String,String> metrics = new HashMap<>();
|
||||
metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
|
||||
metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
|
||||
metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
|
||||
metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
|
||||
metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
|
||||
metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
|
||||
metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
|
||||
|
||||
for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
|
||||
final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
|
||||
switch(entry.getKey()) {
|
||||
case BLOCKED:
|
||||
metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
|
||||
break;
|
||||
case RUNNABLE:
|
||||
metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
|
||||
break;
|
||||
case TERMINATED:
|
||||
metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
|
||||
break;
|
||||
case TIMED_WAITING:
|
||||
metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
|
||||
final String gcName = entry.getKey().replace(" ", "");
|
||||
final long runs = entry.getValue().getRuns();
|
||||
final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
|
||||
metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
|
||||
metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
|
||||
}
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
// calculates the total processing time of all processors in nanos
|
||||
protected long calculateProcessingNanos(final ProcessGroupStatus status) {
|
||||
long nanos = 0L;
|
||||
|
||||
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
|
||||
nanos += procStats.getProcessingNanos();
|
||||
}
|
||||
|
||||
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
|
||||
nanos += calculateProcessingNanos(childGroupStatus);
|
||||
}
|
||||
|
||||
return nanos;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.reporting.ambari.AmbariReportingTask
|
|
@ -0,0 +1,57 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<title>AmbariReportingTask</title>
|
||||
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<h2>AmbariReportingTask</h2>
|
||||
|
||||
<p>This ReportingTask sends the following metrics to Ambari:</p>
|
||||
<ul>
|
||||
<li>FlowFilesReceivedLast5Minutes</li>
|
||||
<li>BytesReceivedLast5Minutes</li>
|
||||
<li>FlowFilesSentLast5Minutes</li>
|
||||
<li>BytesSentLast5Minutes</li>
|
||||
<li>FlowFilesQueued</li>
|
||||
<li>BytesQueued</li>
|
||||
<li>BytesReadLast5Minutes</li>
|
||||
<li>BytesWrittenLast5Minutes</li>
|
||||
<li>ActiveThreads</li>
|
||||
<li>TotalTaskDurationSeconds</li>
|
||||
<li>jvm.uptime</li>
|
||||
<li>jvm.heap_used</li>
|
||||
<li>jvm.heap_usage</li>
|
||||
<li>jvm.non_heap_usage</li>
|
||||
<li>jvm.thread_states.runnable</li>
|
||||
<li>jvm.thread_states.blocked</li>
|
||||
<li>jvm.thread_states.timed_waiting</li>
|
||||
<li>jvm.thread_states.terminated</li>
|
||||
<li>jvm.thread_count</li>
|
||||
<li>jvm.daemon_thread_count</li>
|
||||
<li>jvm.file_descriptor_usage</li>
|
||||
<li>jvm.gc.runs</li>
|
||||
<li>jvm.gc.time</li>
|
||||
</ul>
|
||||
<p>
|
||||
In order to make use of these metrics in Ambari, a NIFI service must be created and installed
|
||||
in Ambari. Please consult the Ambari and NiFi documentation for further details.
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari;
|
||||
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.reporting.EventAccess;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.ReportingInitializationContext;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.Entity;
|
||||
import javax.ws.rs.client.Invocation;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
public class TestAmbariReportingTask {
|
||||
|
||||
private ProcessGroupStatus status;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
status = new ProcessGroupStatus();
|
||||
status.setId("1234");
|
||||
status.setFlowFilesReceived(5);
|
||||
status.setBytesReceived(10000);
|
||||
status.setFlowFilesSent(10);
|
||||
status.setBytesSent(20000);
|
||||
status.setQueuedCount(100);
|
||||
status.setQueuedContentSize(1024L);
|
||||
status.setBytesRead(60000L);
|
||||
status.setBytesWritten(80000L);
|
||||
status.setActiveThreadCount(5);
|
||||
|
||||
// create a processor status with processing time
|
||||
ProcessorStatus procStatus = new ProcessorStatus();
|
||||
procStatus.setProcessingNanos(123456789);
|
||||
|
||||
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
|
||||
processorStatuses.add(procStatus);
|
||||
status.setProcessorStatus(processorStatuses);
|
||||
|
||||
// create a group status with processing time
|
||||
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
|
||||
groupStatus.setProcessorStatus(processorStatuses);
|
||||
|
||||
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
|
||||
groupStatuses.add(groupStatus);
|
||||
status.setProcessGroupStatus(groupStatuses);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnTrigger() throws InitializationException, IOException {
|
||||
final String metricsUrl = "http://myambari:6188/ws/v1/timeline/metrics";
|
||||
final String applicationId = "NIFI";
|
||||
final String hostName = "localhost";
|
||||
|
||||
// create the jersey client mocks for handling the post
|
||||
final Client client = Mockito.mock(Client.class);
|
||||
final WebTarget target = Mockito.mock(WebTarget.class);
|
||||
final Invocation.Builder builder = Mockito.mock(Invocation.Builder.class);
|
||||
|
||||
final Response response = Mockito.mock(Response.class);
|
||||
Mockito.when(response.getStatus()).thenReturn(200);
|
||||
|
||||
Mockito.when(client.target(metricsUrl)).thenReturn(target);
|
||||
Mockito.when(target.request()).thenReturn(builder);
|
||||
Mockito.when(builder.post(Matchers.any(Entity.class))).thenReturn(response);
|
||||
|
||||
// mock the ReportingInitializationContext for initialize(...)
|
||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
|
||||
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
|
||||
Mockito.when(initContext.getLogger()).thenReturn(logger);
|
||||
|
||||
// mock the ConfigurationContext for setup(...)
|
||||
final ConfigurationContext configurationContext = Mockito.mock(ConfigurationContext.class);
|
||||
|
||||
// mock the ReportingContext for onTrigger(...)
|
||||
final ReportingContext context = Mockito.mock(ReportingContext.class);
|
||||
Mockito.when(context.getProperty(AmbariReportingTask.METRICS_COLLECTOR_URL))
|
||||
.thenReturn(new MockPropertyValue(metricsUrl, null));
|
||||
Mockito.when(context.getProperty(AmbariReportingTask.APPLICATION_ID))
|
||||
.thenReturn(new MockPropertyValue(applicationId, null));
|
||||
Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
|
||||
.thenReturn(new MockPropertyValue(hostName, null));
|
||||
|
||||
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
|
||||
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
|
||||
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
|
||||
|
||||
// create a testable instance of the reporting task
|
||||
final AmbariReportingTask task = new TestableAmbariReportingTask(client);
|
||||
task.initialize(initContext);
|
||||
task.setup(configurationContext);
|
||||
task.onTrigger(context);
|
||||
}
|
||||
|
||||
// override the creation of the client to provide a mock
|
||||
private class TestableAmbariReportingTask extends AmbariReportingTask {
|
||||
|
||||
private Client testClient;
|
||||
|
||||
public TestableAmbariReportingTask(Client client) {
|
||||
this.testClient = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Client createClient() {
|
||||
return testClient;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.api;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.json.Json;
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonBuilderFactory;
|
||||
import javax.json.JsonObject;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestMetricsBuilder {
|
||||
|
||||
@Test
|
||||
public void testBuildMetricsObject() {
|
||||
final Map<String, ?> config = Collections.emptyMap();
|
||||
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
|
||||
|
||||
final String instanceId = "1234-5678-1234-5678";
|
||||
final String applicationId = "NIFI";
|
||||
final String hostname = "localhost";
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
|
||||
final Map<String,String> metrics = new HashMap<>();
|
||||
metrics.put("a", "1");
|
||||
metrics.put("b", "2");
|
||||
|
||||
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
|
||||
final JsonObject metricsObject = metricsBuilder
|
||||
.applicationId(applicationId)
|
||||
.instanceId(instanceId)
|
||||
.hostname(hostname)
|
||||
.timestamp(timestamp)
|
||||
.addAllMetrics(metrics)
|
||||
.build();
|
||||
|
||||
final JsonArray metricsArray = metricsObject.getJsonArray("metrics");
|
||||
Assert.assertNotNull(metricsArray);
|
||||
Assert.assertEquals(2, metricsArray.size());
|
||||
|
||||
JsonObject firstMetric = metricsArray.getJsonObject(0);
|
||||
if (!"a".equals(firstMetric.getString(MetricFields.METRIC_NAME))) {
|
||||
firstMetric = metricsArray.getJsonObject(1);
|
||||
}
|
||||
|
||||
Assert.assertEquals("a", firstMetric.getString(MetricFields.METRIC_NAME));
|
||||
Assert.assertEquals(applicationId, firstMetric.getString(MetricFields.APP_ID));
|
||||
Assert.assertEquals(instanceId, firstMetric.getString(MetricFields.INSTANCE_ID));
|
||||
Assert.assertEquals(hostname, firstMetric.getString(MetricFields.HOSTNAME));
|
||||
Assert.assertEquals(String.valueOf(timestamp), firstMetric.getString(MetricFields.TIMESTAMP));
|
||||
|
||||
final JsonObject firstMetricValues = firstMetric.getJsonObject("metrics");
|
||||
Assert.assertEquals("1", firstMetricValues.getString("" + timestamp));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.ambari.metrics;
|
||||
|
||||
import com.yammer.metrics.core.VirtualMachineMetrics;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestMetricsService {
|
||||
|
||||
@Test
|
||||
public void testGetProcessGroupStatusMetrics() {
|
||||
ProcessGroupStatus status = new ProcessGroupStatus();
|
||||
status.setId("1234");
|
||||
status.setFlowFilesReceived(5);
|
||||
status.setBytesReceived(10000);
|
||||
status.setFlowFilesSent(10);
|
||||
status.setBytesSent(20000);
|
||||
status.setQueuedCount(100);
|
||||
status.setQueuedContentSize(1024L);
|
||||
status.setBytesRead(60000L);
|
||||
status.setBytesWritten(80000L);
|
||||
status.setActiveThreadCount(5);
|
||||
|
||||
// create a processor status with processing time
|
||||
ProcessorStatus procStatus = new ProcessorStatus();
|
||||
procStatus.setProcessingNanos(123456789);
|
||||
|
||||
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
|
||||
processorStatuses.add(procStatus);
|
||||
status.setProcessorStatus(processorStatuses);
|
||||
|
||||
// create a group status with processing time
|
||||
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
|
||||
groupStatus.setProcessorStatus(processorStatuses);
|
||||
|
||||
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
|
||||
groupStatuses.add(groupStatus);
|
||||
status.setProcessGroupStatus(groupStatuses);
|
||||
|
||||
final MetricsService service = new MetricsService();
|
||||
|
||||
final Map<String,String> metrics = service.getMetrics(status);
|
||||
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetVirtualMachineMetrics() {
|
||||
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
|
||||
final MetricsService service = new MetricsService();
|
||||
|
||||
final Map<String,String> metrics = service.getMetrics(virtualMachineMetrics);
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
|
||||
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
<?xml version="1.0"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-nar-bundles</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-ambari-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>nifi-ambari-reporting-task</module>
|
||||
<module>nifi-ambari-nar</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.core</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
<version>2.19</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
|
@ -42,6 +42,7 @@
|
|||
<module>nifi-language-translation-bundle</module>
|
||||
<module>nifi-mongodb-bundle</module>
|
||||
<module>nifi-flume-bundle</module>
|
||||
<module>nifi-ambari-bundle</module>
|
||||
</modules>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
|
|
14
pom.xml
14
pom.xml
|
@ -98,6 +98,7 @@
|
|||
<spring.security.version>3.2.7.RELEASE</spring.security.version>
|
||||
<jersey.version>1.19</jersey.version>
|
||||
<hadoop.version>2.6.0</hadoop.version>
|
||||
<yammer.metrics.version>2.2.0</yammer.metrics.version>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
|
@ -501,7 +502,12 @@
|
|||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-ganglia</artifactId>
|
||||
<version>2.2.0</version>
|
||||
<version>${yammer.metrics.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.yammer.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>${yammer.metrics.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.jms</groupId>
|
||||
|
@ -877,6 +883,12 @@
|
|||
<version>0.3.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ambari-nar</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-properties</artifactId>
|
||||
|
|
Loading…
Reference in New Issue