diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index c0edbf2e00..e4f16e93d2 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -444,12 +444,6 @@ language governing permissions and limitations under the License. --> 2.0.0-SNAPSHOT nar - - org.apache.nifi - nifi-ambari-nar - 2.0.0-SNAPSHOT - nar - org.apache.nifi nifi-rethinkdb-nar diff --git a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java index 633d776637..d1ccd7a929 100644 --- a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java +++ b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java @@ -54,6 +54,8 @@ class TestRuntimeManifest { public static final String LIST_HDFS_DEFAULT_SCHEDULE_TIME = "1 min"; + private static final String REPORTING_TASK_DEFAULT_SCHEDULE_TIME = "60 sec"; + @Test void testRuntimeManifest() throws IOException { final ObjectMapper objectMapper = new ObjectMapper(); @@ -199,24 +201,24 @@ class TestRuntimeManifest { assertEquals(1, propertyMaxUncommitDependency.getDependentValues().size()); assertEquals("true", propertyMaxUncommitDependency.getDependentValues().get(0)); - // Verify AmbariReportingTask definition which also has @DefaultSchedule - final ReportingTaskDefinition ambariReportingTaskDef = getReportingTaskDefinition(bundles, "nifi-ambari-nar", - "org.apache.nifi.reporting.ambari.AmbariReportingTask"); + // Verify PrometheusReportingTask definition which also has @DefaultSchedule + final ReportingTaskDefinition prometheusReportingTaskDef = getReportingTaskDefinition(bundles, "nifi-prometheus-nar", + "org.apache.nifi.reporting.prometheus.PrometheusReportingTask"); - assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), ambariReportingTaskDef.getDefaultSchedulingStrategy()); + assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), prometheusReportingTaskDef.getDefaultSchedulingStrategy()); - final List ambariSchedulingStrategies = ambariReportingTaskDef.getSupportedSchedulingStrategies(); - assertNotNull(ambariSchedulingStrategies); - assertEquals(2, ambariSchedulingStrategies.size()); - assertTrue(ambariSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name())); - assertTrue(ambariSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name())); + final List prometheusSchedulingStrategies = prometheusReportingTaskDef.getSupportedSchedulingStrategies(); + assertNotNull(prometheusSchedulingStrategies); + assertEquals(2, prometheusSchedulingStrategies.size()); + assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name())); + assertTrue(prometheusSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name())); - final Map ambariDefaultSchedulingPeriods = ambariReportingTaskDef.getDefaultSchedulingPeriodBySchedulingStrategy(); - assertNotNull(ambariDefaultSchedulingPeriods); - assertEquals(2, ambariDefaultSchedulingPeriods.size()); + final Map prometheusDefaultSchedulingPeriods = prometheusReportingTaskDef.getDefaultSchedulingPeriodBySchedulingStrategy(); + assertNotNull(prometheusDefaultSchedulingPeriods); + assertEquals(2, prometheusDefaultSchedulingPeriods.size()); // TIMER_DRIVEN period should come from the @DefaultSchedule annotation that overrides the default value - assertEquals(LIST_HDFS_DEFAULT_SCHEDULE_TIME, ambariDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name())); - assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), ambariDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name())); + assertEquals(REPORTING_TASK_DEFAULT_SCHEDULE_TIME, prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name())); + assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name())); // Verify JoltTransformRecord which has @EventDriven final ProcessorDefinition joltTransformDef = getProcessorDefinition(bundles, "nifi-jolt-record-nar", diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml deleted file mode 100644 index 2e10385040..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/pom.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - 4.0.0 - - org.apache.nifi - nifi-ambari-bundle - 2.0.0-SNAPSHOT - - - nifi-ambari-nar - nar - - true - true - - - - org.apache.nifi - nifi-ambari-reporting-task - 2.0.0-SNAPSHOT - - - diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 186452cd9f..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-nar/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,51 +0,0 @@ -nifi-ambari-nar -Copyright 2015-2020 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -****************** -Apache Software License v2 -****************** - -The following binary components are provided under the Apache Software License v2 - - (ASLv2) Dropwizard Metrics - The following NOTICE information applies: - Metrics - Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team - This product includes software developed by Coda Hale and Yammer, Inc. - - This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, - LongAdder), which was released with the following comments: - - Written by Doug Lea with assistance from members of JCP JSR-166 - Expert Group and released to the public domain, as explained at - http://creativecommons.org/publicdomain/zero/1.0/ - - The derived work in the nifi-metrics module is adapted from - https://github.com/dropwizard/metrics/blob/v2.2.0/metrics-core/src/main/java/com/yammer/metrics/core/VirtualMachineMetrics.java - and can be found in - nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java - nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java - -************************ -Common Development and Distribution License 1.1 -************************ - -The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. - - (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) - (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) - (CDDL 1.1) (GPL2 w/ CPE) OSGi resource locator bundle (org.glassfish.hk2:osgi-resource-locator:jar:1.0.1 - http://glassfish.org/osgi-resource-locator) - (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250) - (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api) - (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator) - (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils) - (CDDL 1.1) (GPL2 w/ CPE) aopalliance version 1.0 repackaged as a module (org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b25 - https://hk2.java.net/external/aopalliance-repackaged) - (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject) - (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net) - (CDDL 1.1) (GPL2 w/ CPE) jersey-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/) - (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/) - (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/) - diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml deleted file mode 100644 index 267043e31c..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml +++ /dev/null @@ -1,64 +0,0 @@ - - - - 4.0.0 - - org.apache.nifi - nifi-ambari-bundle - 2.0.0-SNAPSHOT - - - nifi-ambari-reporting-task - Publishes NiFi metrics to Ambari - - - - org.glassfish.jersey.core - jersey-client - - - javax.json - javax.json-api - 1.1.4 - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-utils - 2.0.0-SNAPSHOT - - - org.apache.nifi - nifi-reporting-utils - 2.0.0-SNAPSHOT - - - org.apache.nifi - nifi-metrics - 2.0.0-SNAPSHOT - - - - org.apache.nifi - nifi-mock - 2.0.0-SNAPSHOT - test - - - diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java deleted file mode 100644 index 886e8913b3..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari; - -import org.apache.nifi.annotation.configuration.DefaultSchedule; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.metrics.jvm.JmxJvmMetrics; -import org.apache.nifi.metrics.jvm.JvmMetrics; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.reporting.AbstractReportingTask; -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.util.metrics.MetricsService; -import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; -import org.apache.nifi.scheduling.SchedulingStrategy; - -import javax.json.Json; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -@Tags({"reporting", "ambari", "metrics"}) -@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " + - "works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " + - "from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " + - "task at a frequency other than 60 seconds may produce unexpected results.") -@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") -public class AmbariReportingTask extends AbstractReportingTask { - - static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder() - .name("Metrics Collector URL") - .description("The URL of the Ambari Metrics Collector Service") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("http://localhost:6188/ws/v1/timeline/metrics") - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); - - static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() - .name("Application ID") - .description("The Application ID to be included in the metrics sent to Ambari") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("nifi") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Hostname") - .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("${hostname(true)}") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder() - .name("Process Group ID") - .description("If specified, the reporting task will send metrics about this process group only. If" - + " not, the root process group is used and global metrics are sent.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - private volatile Client client; - private volatile JsonBuilderFactory factory; - private volatile JvmMetrics virtualMachineMetrics; - private volatile JsonObject previousMetrics = null; - - private final MetricsService metricsService = new MetricsService(); - - @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(METRICS_COLLECTOR_URL); - properties.add(APPLICATION_ID); - properties.add(HOSTNAME); - properties.add(PROCESS_GROUP_ID); - return properties; - } - - @OnScheduled - public void setup(final ConfigurationContext context) throws IOException { - final Map config = Collections.emptyMap(); - factory = Json.createBuilderFactory(config); - client = createClient(); - virtualMachineMetrics = JmxJvmMetrics.getInstance(); - previousMetrics = null; - } - - // used for testing to allow tests to override the client - protected Client createClient() { - return ClientBuilder.newClient(); - } - - @Override - public void onTrigger(final ReportingContext context) { - final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue(); - final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue(); - final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); - - final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet(); - final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null; - - final long start = System.currentTimeMillis(); - - // send the metrics from last execution - if (previousMetrics != null) { - final WebTarget metricsTarget = client.target(metricsCollectorUrl); - final Invocation.Builder invocation = metricsTarget.request(); - - final Entity entity = Entity.json(previousMetrics.toString()); - getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()}); - - final Response response = invocation.post(entity); - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start); - getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis}); - } else { - final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error"; - getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity}); - } - } - - // calculate the current metrics, but store them to be sent next time - final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId); - - if(status != null) { - final Map statusMetrics = metricsService.getMetrics(status, pgIdIsSet); - final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); - - final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); - - final JsonObject metricsObject = metricsBuilder - .applicationId(applicationId) - .instanceId(status.getId()) - .hostname(hostname) - .timestamp(start) - .addAllMetrics(statusMetrics) - .addAllMetrics(jvmMetrics) - .build(); - - previousMetrics = metricsObject; - } else { - getLogger().error("No process group status with ID = {}", new Object[]{processGroupId}); - previousMetrics = null; - } - } - -} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask deleted file mode 100644 index 274710acdd..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.reporting.ambari.AmbariReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html deleted file mode 100644 index 49409cf7e8..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.ambari.AmbariReportingTask/additionalDetails.html +++ /dev/null @@ -1,57 +0,0 @@ - - - - - - AmbariReportingTask - - - - -

AmbariReportingTask

- -

This ReportingTask sends the following metrics to Ambari:

-
    -
  • FlowFilesReceivedLast5Minutes
  • -
  • BytesReceivedLast5Minutes
  • -
  • FlowFilesSentLast5Minutes
  • -
  • BytesSentLast5Minutes
  • -
  • FlowFilesQueued
  • -
  • BytesQueued
  • -
  • BytesReadLast5Minutes
  • -
  • BytesWrittenLast5Minutes
  • -
  • ActiveThreads
  • -
  • TotalTaskDurationSeconds
  • -
  • jvm.uptime
  • -
  • jvm.heap_used
  • -
  • jvm.heap_usage
  • -
  • jvm.non_heap_usage
  • -
  • jvm.thread_states.runnable
  • -
  • jvm.thread_states.blocked
  • -
  • jvm.thread_states.timed_waiting
  • -
  • jvm.thread_states.terminated
  • -
  • jvm.thread_count
  • -
  • jvm.daemon_thread_count
  • -
  • jvm.file_descriptor_usage
  • -
  • jvm.gc.runs
  • -
  • jvm.gc.time
  • -
-

- In order to make use of these metrics in Ambari, a NIFI service must be created and installed - in Ambari. Please consult the Ambari and NiFi documentation for further details. -

- - diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java deleted file mode 100644 index ed49e9ca00..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari; - -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.reporting.EventAccess; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ReportingInitializationContext; -import org.apache.nifi.util.MockPropertyValue; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; - -public class TestAmbariReportingTask { - - private ProcessGroupStatus status; - - @BeforeEach - public void setup() { - status = new ProcessGroupStatus(); - status.setId("1234"); - status.setFlowFilesReceived(5); - status.setBytesReceived(10000); - status.setFlowFilesSent(10); - status.setBytesSent(20000); - status.setQueuedCount(100); - status.setQueuedContentSize(1024L); - status.setBytesRead(60000L); - status.setBytesWritten(80000L); - status.setActiveThreadCount(5); - - // create a processor status with processing time - ProcessorStatus procStatus = new ProcessorStatus(); - procStatus.setProcessingNanos(123456789); - - Collection processorStatuses = new ArrayList<>(); - processorStatuses.add(procStatus); - status.setProcessorStatus(processorStatuses); - - // create a group status with processing time - ProcessGroupStatus groupStatus = new ProcessGroupStatus(); - groupStatus.setProcessorStatus(processorStatuses); - - Collection groupStatuses = new ArrayList<>(); - groupStatuses.add(groupStatus); - status.setProcessGroupStatus(groupStatuses); - } - - @Test - public void testOnTrigger() throws InitializationException, IOException { - final String metricsUrl = "http://myambari:6188/ws/v1/timeline/metrics"; - final String applicationId = "NIFI"; - final String hostName = "localhost"; - - // create the jersey client mocks for handling the post - final Client client = Mockito.mock(Client.class); - final WebTarget target = Mockito.mock(WebTarget.class); - final Invocation.Builder builder = Mockito.mock(Invocation.Builder.class); - - final Response response = Mockito.mock(Response.class); - Mockito.when(response.getStatus()).thenReturn(200); - - Mockito.when(client.target(metricsUrl)).thenReturn(target); - Mockito.when(target.request()).thenReturn(builder); - Mockito.when(builder.post(ArgumentMatchers.any(Entity.class))).thenReturn(response); - - // mock the ReportingInitializationContext for initialize(...) - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); - - // mock the ConfigurationContext for setup(...) - final ConfigurationContext configurationContext = Mockito.mock(ConfigurationContext.class); - - // mock the ReportingContext for onTrigger(...) - final ReportingContext context = Mockito.mock(ReportingContext.class); - Mockito.when(context.getProperty(AmbariReportingTask.METRICS_COLLECTOR_URL)) - .thenReturn(new MockPropertyValue(metricsUrl)); - Mockito.when(context.getProperty(AmbariReportingTask.APPLICATION_ID)) - .thenReturn(new MockPropertyValue(applicationId)); - Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME)) - .thenReturn(new MockPropertyValue(hostName)); - Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID)) - .thenReturn(new MockPropertyValue("1234")); - - - final EventAccess eventAccess = Mockito.mock(EventAccess.class); - Mockito.when(context.getEventAccess()).thenReturn(eventAccess); - Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); - - // create a testable instance of the reporting task - final AmbariReportingTask task = new TestableAmbariReportingTask(client); - task.initialize(initContext); - task.setup(configurationContext); - task.onTrigger(context); - } - - // override the creation of the client to provide a mock - private class TestableAmbariReportingTask extends AmbariReportingTask { - - private Client testClient; - - public TestableAmbariReportingTask(Client client) { - this.testClient = client; - } - - @Override - protected Client createClient() { - return testClient; - } - } -} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java deleted file mode 100644 index ce3c689c8a..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -import org.apache.nifi.reporting.util.metrics.api.MetricFields; -import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; -import org.junit.jupiter.api.Test; - -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -public class TestMetricsBuilder { - - @Test - public void testBuildMetricsObject() { - final Map config = Collections.emptyMap(); - final JsonBuilderFactory factory = Json.createBuilderFactory(config); - - final String instanceId = "1234-5678-1234-5678"; - final String applicationId = "NIFI"; - final String hostname = "localhost"; - final long timestamp = System.currentTimeMillis(); - - final Map metrics = new HashMap<>(); - metrics.put("a", "1"); - metrics.put("b", "2"); - - final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); - final JsonObject metricsObject = metricsBuilder - .applicationId(applicationId) - .instanceId(instanceId) - .hostname(hostname) - .timestamp(timestamp) - .addAllMetrics(metrics) - .build(); - - final JsonArray metricsArray = metricsObject.getJsonArray("metrics"); - assertNotNull(metricsArray); - assertEquals(2, metricsArray.size()); - - JsonObject firstMetric = metricsArray.getJsonObject(0); - if (!"a".equals(firstMetric.getString(MetricFields.METRIC_NAME))) { - firstMetric = metricsArray.getJsonObject(1); - } - - assertEquals("a", firstMetric.getString(MetricFields.METRIC_NAME)); - assertEquals(applicationId, firstMetric.getString(MetricFields.APP_ID)); - assertEquals(instanceId, firstMetric.getString(MetricFields.INSTANCE_ID)); - assertEquals(hostname, firstMetric.getString(MetricFields.HOSTNAME)); - assertEquals(String.valueOf(timestamp), firstMetric.getString(MetricFields.TIMESTAMP)); - - final JsonObject firstMetricValues = firstMetric.getJsonObject("metrics"); - assertEquals("1", firstMetricValues.getString("" + timestamp)); - } - -} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java deleted file mode 100644 index 80fc1b194b..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.metrics; - -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.metrics.jvm.JmxJvmMetrics; -import org.apache.nifi.metrics.jvm.JvmMetrics; -import org.apache.nifi.reporting.util.metrics.MetricNames; -import org.apache.nifi.reporting.util.metrics.MetricsService; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestMetricsService { - - @Test - public void testGetProcessGroupStatusMetrics() { - ProcessGroupStatus status = new ProcessGroupStatus(); - status.setId("1234"); - status.setFlowFilesReceived(5); - status.setBytesReceived(10000); - status.setFlowFilesSent(10); - status.setBytesSent(20000); - status.setQueuedCount(100); - status.setQueuedContentSize(1024L); - status.setBytesRead(60000L); - status.setBytesWritten(80000L); - status.setActiveThreadCount(5); - - // create a processor status with processing time - ProcessorStatus procStatus = new ProcessorStatus(); - procStatus.setProcessingNanos(123456789); - - Collection processorStatuses = new ArrayList<>(); - processorStatuses.add(procStatus); - status.setProcessorStatus(processorStatuses); - - // create a group status with processing time - ProcessGroupStatus groupStatus = new ProcessGroupStatus(); - groupStatus.setProcessorStatus(processorStatuses); - - Collection groupStatuses = new ArrayList<>(); - groupStatuses.add(groupStatus); - status.setProcessGroupStatus(groupStatuses); - - final MetricsService service = new MetricsService(); - - final Map metrics = service.getMetrics(status, false); - - assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED)); - assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED)); - assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT)); - assertTrue(metrics.containsKey(MetricNames.BYTES_SENT)); - assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED)); - assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED)); - assertTrue(metrics.containsKey(MetricNames.BYTES_READ)); - assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN)); - assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS)); - assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_SECONDS)); - assertTrue(metrics.containsKey(MetricNames.TOTAL_TASK_DURATION_NANOS)); - } - - @Test - public void testGetProcessGroupStatusMetricsWithID() { - ProcessGroupStatus status = new ProcessGroupStatus(); - String id = "1234"; - status.setId(id); - status.setFlowFilesReceived(5); - status.setBytesReceived(10000); - status.setFlowFilesSent(10); - status.setBytesSent(20000); - status.setQueuedCount(100); - status.setQueuedContentSize(1024L); - status.setBytesRead(60000L); - status.setBytesWritten(80000L); - status.setActiveThreadCount(5); - - // create a processor status with processing time - ProcessorStatus procStatus = new ProcessorStatus(); - procStatus.setProcessingNanos(123456789); - - Collection processorStatuses = new ArrayList<>(); - processorStatuses.add(procStatus); - status.setProcessorStatus(processorStatuses); - - // create a group status with processing time - ProcessGroupStatus groupStatus = new ProcessGroupStatus(); - groupStatus.setProcessorStatus(processorStatuses); - - Collection groupStatuses = new ArrayList<>(); - groupStatuses.add(groupStatus); - status.setProcessGroupStatus(groupStatuses); - - final MetricsService service = new MetricsService(); - - final Map metrics = service.getMetrics(status, true); - - assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id)); - } - - @Test - public void testGetVirtualMachineMetrics() { - final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance(); - final MetricsService service = new MetricsService(); - - final Map metrics = service.getMetrics(virtualMachineMetrics); - assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME)); - assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED)); - assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE)); - assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE)); - assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE)); - assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED)); - assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING)); - assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED)); - assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT)); - assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT)); - assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE)); - } - -} diff --git a/nifi-nar-bundles/nifi-ambari-bundle/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/pom.xml deleted file mode 100644 index caa02b8cf5..0000000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/pom.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - - 4.0.0 - - org.apache.nifi - nifi-nar-bundles - 2.0.0-SNAPSHOT - - - nifi-ambari-bundle - pom - - - nifi-ambari-reporting-task - nifi-ambari-nar - - diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index a76d1aac05..24afd207f9 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -46,7 +46,6 @@ nifi-language-translation-bundle nifi-mongodb-bundle nifi-hbase-bundle - nifi-ambari-bundle nifi-asana-bundle nifi-media-bundle nifi-avro-bundle