NIFI-11180 Removed Ambari Reporting Task

- Updated TestRuntimeManifest to reference Prometheus Reporting Task

This closes #6952

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-02-14 15:44:28 -05:00 committed by exceptionfactory
parent aae6bafc6c
commit b56b1da842
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
13 changed files with 16 additions and 815 deletions

View File

@ -444,12 +444,6 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ambari-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-rethinkdb-nar</artifactId>

View File

@ -54,6 +54,8 @@ class TestRuntimeManifest {
public static final String LIST_HDFS_DEFAULT_SCHEDULE_TIME = "1 min";
private static final String REPORTING_TASK_DEFAULT_SCHEDULE_TIME = "60 sec";
@Test
void testRuntimeManifest() throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
@ -199,24 +201,24 @@ class TestRuntimeManifest {
assertEquals(1, propertyMaxUncommitDependency.getDependentValues().size());
assertEquals("true", propertyMaxUncommitDependency.getDependentValues().get(0));
// Verify AmbariReportingTask definition which also has @DefaultSchedule
final ReportingTaskDefinition ambariReportingTaskDef = getReportingTaskDefinition(bundles, "nifi-ambari-nar",
"org.apache.nifi.reporting.ambari.AmbariReportingTask");
// Verify PrometheusReportingTask definition which also has @DefaultSchedule
final ReportingTaskDefinition prometheusReportingTaskDef = getReportingTaskDefinition(bundles, "nifi-prometheus-nar",
"org.apache.nifi.reporting.prometheus.PrometheusReportingTask");
assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), ambariReportingTaskDef.getDefaultSchedulingStrategy());
assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), prometheusReportingTaskDef.getDefaultSchedulingStrategy());
final List<String> ambariSchedulingStrategies = ambariReportingTaskDef.getSupportedSchedulingStrategies();
assertNotNull(ambariSchedulingStrategies);
assertEquals(2, ambariSchedulingStrategies.size());
assertTrue(ambariSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(ambariSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));
final List<String> prometheusSchedulingStrategies = prometheusReportingTaskDef.getSupportedSchedulingStrategies();
assertNotNull(prometheusSchedulingStrategies);
assertEquals(2, prometheusSchedulingStrategies.size());
assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));
final Map<String, String> ambariDefaultSchedulingPeriods = ambariReportingTaskDef.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(ambariDefaultSchedulingPeriods);
assertEquals(2, ambariDefaultSchedulingPeriods.size());
final Map<String, String> prometheusDefaultSchedulingPeriods = prometheusReportingTaskDef.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(prometheusDefaultSchedulingPeriods);
assertEquals(2, prometheusDefaultSchedulingPeriods.size());
// TIMER_DRIVEN period should come from the @DefaultSchedule annotation that overrides the default value
assertEquals(LIST_HDFS_DEFAULT_SCHEDULE_TIME, ambariDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), ambariDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
assertEquals(REPORTING_TASK_DEFAULT_SCHEDULE_TIME, prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
// Verify JoltTransformRecord which has @EventDriven
final ProcessorDefinition joltTransformDef = getProcessorDefinition(bundles, "nifi-jolt-record-nar",

View File

@ -1,36 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ambari-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-ambari-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ambari-reporting-task</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,51 +0,0 @@
nifi-ambari-nar
Copyright 2015-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Dropwizard Metrics
The following NOTICE information applies:
Metrics
Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
This product includes software developed by Coda Hale and Yammer, Inc.
This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
LongAdder), which was released with the following comments:
Written by Doug Lea with assistance from members of JCP JSR-166
Expert Group and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/
The derived work in the nifi-metrics module is adapted from
https://github.com/dropwizard/metrics/blob/v2.2.0/metrics-core/src/main/java/com/yammer/metrics/core/VirtualMachineMetrics.java
and can be found in
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
(CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator)
(CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250)
(CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api)
(CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator)
(CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils)
(CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged)
(CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject)
(CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) jersey-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/)

View File

@ -1,64 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ambari-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-ambari-reporting-task</artifactId>
<description>Publishes NiFi metrics to Ambari</description>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-reporting-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-metrics</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,181 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.ambari;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.metrics.MetricsService;
import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
import org.apache.nifi.scheduling.SchedulingStrategy;
import javax.json.Json;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tags({"reporting", "ambari", "metrics"})
@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " +
"works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " +
"from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " +
"task at a frequency other than 60 seconds may produce unexpected results.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class AmbariReportingTask extends AbstractReportingTask {
static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
.name("Metrics Collector URL")
.description("The URL of the Ambari Metrics Collector Service")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("http://localhost:6188/ws/v1/timeline/metrics")
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
.name("Application ID")
.description("The Application ID to be included in the metrics sent to Ambari")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
.name("Process Group ID")
.description("If specified, the reporting task will send metrics about this process group only. If"
+ " not, the root process group is used and global metrics are sent.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile Client client;
private volatile JsonBuilderFactory factory;
private volatile JvmMetrics virtualMachineMetrics;
private volatile JsonObject previousMetrics = null;
private final MetricsService metricsService = new MetricsService();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(METRICS_COLLECTOR_URL);
properties.add(APPLICATION_ID);
properties.add(HOSTNAME);
properties.add(PROCESS_GROUP_ID);
return properties;
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
final Map<String, ?> config = Collections.emptyMap();
factory = Json.createBuilderFactory(config);
client = createClient();
virtualMachineMetrics = JmxJvmMetrics.getInstance();
previousMetrics = null;
}
// used for testing to allow tests to override the client
protected Client createClient() {
return ClientBuilder.newClient();
}
@Override
public void onTrigger(final ReportingContext context) {
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue();
final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet();
final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null;
final long start = System.currentTimeMillis();
// send the metrics from last execution
if (previousMetrics != null) {
final WebTarget metricsTarget = client.target(metricsCollectorUrl);
final Invocation.Builder invocation = metricsTarget.request();
final Entity<String> entity = Entity.json(previousMetrics.toString());
getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
final Response response = invocation.post(entity);
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis});
} else {
final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
}
}
// calculate the current metrics, but store them to be sent next time
final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId);
if(status != null) {
final Map<String,String> statusMetrics = metricsService.getMetrics(status, pgIdIsSet);
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
final JsonObject metricsObject = metricsBuilder
.applicationId(applicationId)
.instanceId(status.getId())
.hostname(hostname)
.timestamp(start)
.addAllMetrics(statusMetrics)
.addAllMetrics(jvmMetrics)
.build();
previousMetrics = metricsObject;
} else {
getLogger().error("No process group status with ID = {}", new Object[]{processGroupId});
previousMetrics = null;
}
}
}

View File

@ -1,15 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.ambari.AmbariReportingTask

View File

@ -1,57 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>AmbariReportingTask</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>AmbariReportingTask</h2>
<p>This ReportingTask sends the following metrics to Ambari:</p>
<ul>
<li>FlowFilesReceivedLast5Minutes</li>
<li>BytesReceivedLast5Minutes</li>
<li>FlowFilesSentLast5Minutes</li>
<li>BytesSentLast5Minutes</li>
<li>FlowFilesQueued</li>
<li>BytesQueued</li>
<li>BytesReadLast5Minutes</li>
<li>BytesWrittenLast5Minutes</li>
<li>ActiveThreads</li>
<li>TotalTaskDurationSeconds</li>
<li>jvm.uptime</li>
<li>jvm.heap_used</li>
<li>jvm.heap_usage</li>
<li>jvm.non_heap_usage</li>
<li>jvm.thread_states.runnable</li>
<li>jvm.thread_states.blocked</li>
<li>jvm.thread_states.timed_waiting</li>
<li>jvm.thread_states.terminated</li>
<li>jvm.thread_count</li>
<li>jvm.daemon_thread_count</li>
<li>jvm.file_descriptor_usage</li>
<li>jvm.gc.runs</li>
<li>jvm.gc.time</li>
</ul>
<p>
In order to make use of these metrics in Ambari, a NIFI service must be created and installed
in Ambari. Please consult the Ambari and NiFi documentation for further details.
</p>
</body>
</html>

View File

@ -1,142 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.ambari;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
public class TestAmbariReportingTask {
private ProcessGroupStatus status;
@BeforeEach
public void setup() {
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
status.setProcessGroupStatus(groupStatuses);
}
@Test
public void testOnTrigger() throws InitializationException, IOException {
final String metricsUrl = "http://myambari:6188/ws/v1/timeline/metrics";
final String applicationId = "NIFI";
final String hostName = "localhost";
// create the jersey client mocks for handling the post
final Client client = Mockito.mock(Client.class);
final WebTarget target = Mockito.mock(WebTarget.class);
final Invocation.Builder builder = Mockito.mock(Invocation.Builder.class);
final Response response = Mockito.mock(Response.class);
Mockito.when(response.getStatus()).thenReturn(200);
Mockito.when(client.target(metricsUrl)).thenReturn(target);
Mockito.when(target.request()).thenReturn(builder);
Mockito.when(builder.post(ArgumentMatchers.any(Entity.class))).thenReturn(response);
// mock the ReportingInitializationContext for initialize(...)
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
// mock the ConfigurationContext for setup(...)
final ConfigurationContext configurationContext = Mockito.mock(ConfigurationContext.class);
// mock the ReportingContext for onTrigger(...)
final ReportingContext context = Mockito.mock(ReportingContext.class);
Mockito.when(context.getProperty(AmbariReportingTask.METRICS_COLLECTOR_URL))
.thenReturn(new MockPropertyValue(metricsUrl));
Mockito.when(context.getProperty(AmbariReportingTask.APPLICATION_ID))
.thenReturn(new MockPropertyValue(applicationId));
Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
.thenReturn(new MockPropertyValue(hostName));
Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID))
.thenReturn(new MockPropertyValue("1234"));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
// create a testable instance of the reporting task
final AmbariReportingTask task = new TestableAmbariReportingTask(client);
task.initialize(initContext);
task.setup(configurationContext);
task.onTrigger(context);
}
// override the creation of the client to provide a mock
private class TestableAmbariReportingTask extends AmbariReportingTask {
private Client testClient;
public TestableAmbariReportingTask(Client client) {
this.testClient = client;
}
@Override
protected Client createClient() {
return testClient;
}
}
}

View File

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.ambari.api;
import org.apache.nifi.reporting.util.metrics.api.MetricFields;
import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
import org.junit.jupiter.api.Test;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class TestMetricsBuilder {
@Test
public void testBuildMetricsObject() {
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final String instanceId = "1234-5678-1234-5678";
final String applicationId = "NIFI";
final String hostname = "localhost";
final long timestamp = System.currentTimeMillis();
final Map<String,String> metrics = new HashMap<>();
metrics.put("a", "1");
metrics.put("b", "2");
final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
final JsonObject metricsObject = metricsBuilder
.applicationId(applicationId)
.instanceId(instanceId)
.hostname(hostname)
.timestamp(timestamp)
.addAllMetrics(metrics)
.build();
final JsonArray metricsArray = metricsObject.getJsonArray("metrics");
assertNotNull(metricsArray);
assertEquals(2, metricsArray.size());
JsonObject firstMetric = metricsArray.getJsonObject(0);
if (!"a".equals(firstMetric.getString(MetricFields.METRIC_NAME))) {
firstMetric = metricsArray.getJsonObject(1);
}
assertEquals("a", firstMetric.getString(MetricFields.METRIC_NAME));
assertEquals(applicationId, firstMetric.getString(MetricFields.APP_ID));
assertEquals(instanceId, firstMetric.getString(MetricFields.INSTANCE_ID));
assertEquals(hostname, firstMetric.getString(MetricFields.HOSTNAME));
assertEquals(String.valueOf(timestamp), firstMetric.getString(MetricFields.TIMESTAMP));
final JsonObject firstMetricValues = firstMetric.getJsonObject("metrics");
assertEquals("1", firstMetricValues.getString("" + timestamp));
}
}

View File

@ -1,139 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.ambari.metrics;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.reporting.util.metrics.MetricsService;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMetricsService {
@Test
public void testGetProcessGroupStatusMetrics() {
ProcessGroupStatus status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
status.setProcessGroupStatus(groupStatuses);
final MetricsService service = new MetricsService();
final Map<String,String> metrics = service.getMetrics(status, false);
assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_SECONDS));
assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_NANOS));
}
@Test
public void testGetProcessGroupStatusMetricsWithID() {
ProcessGroupStatus status = new ProcessGroupStatus();
String id = "1234";
status.setId(id);
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
// create a processor status with processing time
ProcessorStatus procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
// create a group status with processing time
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
status.setProcessGroupStatus(groupStatuses);
final MetricsService service = new MetricsService();
final Map<String,String> metrics = service.getMetrics(status, true);
assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id));
}
@Test
public void testGetVirtualMachineMetrics() {
final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
final MetricsService service = new MetricsService();
final Map<String,String> metrics = service.getMetrics(virtualMachineMetrics);
assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
}
}

View File

@ -1,31 +0,0 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-ambari-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-ambari-reporting-task</module>
<module>nifi-ambari-nar</module>
</modules>
</project>

View File

@ -46,7 +46,6 @@
<module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module>
<module>nifi-hbase-bundle</module>
<module>nifi-ambari-bundle</module>
<module>nifi-asana-bundle</module>
<module>nifi-media-bundle</module>
<module>nifi-avro-bundle</module>