diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 108438bf85..7710b3db67 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -516,6 +516,16 @@ nifi-redis-nar nar + + org.apache.nifi + nifi-metrics-reporter-service-api-nar + nar + + + org.apache.nifi + nifi-metrics-reporting-nar + nar + diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java index 2a2aab2f52..38d1619e14 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java @@ -18,7 +18,9 @@ package org.apache.nifi.util; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.nifi.action.Action; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -31,10 +33,14 @@ public class MockEventAccess implements EventAccess { private ProcessGroupStatus processGroupStatus; private final List provenanceRecords = new ArrayList<>(); private final List flowChanges = new ArrayList<>(); + private final Map processGroupStatusMap = new HashMap<>(); public void setProcessGroupStatus(final ProcessGroupStatus status) { this.processGroupStatus = status; } + public void setProcessGroupStatus(String groupId, final ProcessGroupStatus status) { + processGroupStatusMap.put(groupId, status); + } @Override public ProcessGroupStatus getControllerStatus() { @@ -43,7 +49,7 @@ public class MockEventAccess implements EventAccess { @Override public ProcessGroupStatus getGroupStatus(final String groupId) { - return null; + return processGroupStatusMap.get(groupId); } @Override diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml new file mode 100644 index 0000000000..de48b81ba1 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml @@ -0,0 +1,40 @@ + + + + + nifi-metrics-reporting-bundle + org.apache.nifi + 1.5.0-SNAPSHOT + + nar + 4.0.0 + + nifi-metrics-reporter-service-api-nar + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-metrics-reporter-service-api + 1.5.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..76b99fd939 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,25 @@ +nifi-metrics-reporter-service-api-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Dropwizard Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, + LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml new file mode 100644 index 0000000000..173d35cae3 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml @@ -0,0 +1,27 @@ + + + + + nifi-metrics-reporting-bundle + org.apache.nifi + 1.5.0-SNAPSHOT + + 4.0.0 + + nifi-metrics-reporter-service-api + diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java new file mode 100644 index 0000000000..d87be15f83 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * An interface for controller services used by MetricsReportingTask. In order to report to a new + * client, implement this interface and make sure to return the desired implementation of {@link ScheduledReporter}. + * + * @author Omer Hadari + */ +public interface MetricReporterService extends ControllerService { + + /** + * Create a reporter to a metric client (i.e. graphite). + * + * @param metricRegistry registry with the metrics to report. + * @return an instance of the reporter. + * @throws ProcessException if there was an error creating the reporter. + */ + ScheduledReporter createReporter(MetricRegistry metricRegistry) throws ProcessException; +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml new file mode 100644 index 0000000000..f92b17b1cc --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml @@ -0,0 +1,42 @@ + + + + + nifi-metrics-reporting-bundle + org.apache.nifi + 1.5.0-SNAPSHOT + + nar + 4.0.0 + + nifi-metrics-reporting-nar + + + + org.apache.nifi + nifi-metrics-reporting-task + 1.5.0-SNAPSHOT + + + org.apache.nifi + nifi-metrics-reporter-service-api-nar + 1.5.0-SNAPSHOT + nar + + + diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..504e3f12e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,25 @@ +nifi-metrics-reporting-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Dropwizard Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, + LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml new file mode 100644 index 0000000000..7b437bca31 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml @@ -0,0 +1,55 @@ + + + + + nifi-metrics-reporting-bundle + org.apache.nifi + 1.5.0-SNAPSHOT + + 4.0.0 + + nifi-metrics-reporting-task + + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-metrics-reporter-service-api + 1.5.0-SNAPSHOT + provided + + + io.dropwizard.metrics + metrics-graphite + 3.1.2 + + + io.dropwizard.metrics + metrics-jvm + 3.1.2 + + + org.apache.nifi + nifi-mock + test + + + diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java new file mode 100644 index 0000000000..d6e2fbab8c --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A metric set of NiFi instance related metrics. + * + * @author Omer Hadari + */ +public class FlowMetricSet implements MetricSet { + + + /** + * Reference to the process status that should be reported. Should be updated when the status changes. + */ + private final AtomicReference currentStatusReference; + + /** + * Create a metric set that will look at a given process status reference for deciding metrics. + * + * @param currentStatusReference a reference to the process status. + */ + public FlowMetricSet(AtomicReference currentStatusReference) { + this.currentStatusReference = currentStatusReference; + } + + /** + * Create a map of {@link Gauge}s for the {@link #currentStatusReference}. This methods reports the metrics as + * found in the reference. + * + * @return map between the metric name and a {@link Gauge} to it's value. + */ + @Override + public Map getMetrics() { + + Map metrics = new HashMap<>(); + + metrics.put(MetricNames.ACTIVE_THREADS, (Gauge) () -> currentStatusReference.get().getActiveThreadCount()); + metrics.put(MetricNames.BYTES_QUEUED, (Gauge) () -> currentStatusReference.get().getQueuedContentSize()); + metrics.put(MetricNames.BYTES_READ, (Gauge) () -> currentStatusReference.get().getBytesRead()); + metrics.put(MetricNames.BYTES_RECEIVED, (Gauge) () -> currentStatusReference.get().getBytesReceived()); + metrics.put(MetricNames.BYTES_SENT, (Gauge) () -> currentStatusReference.get().getBytesSent()); + metrics.put(MetricNames.BYTES_WRITTEN, (Gauge) () -> currentStatusReference.get().getBytesWritten()); + metrics.put(MetricNames.FLOW_FILES_RECEIVED, (Gauge) () -> currentStatusReference.get().getFlowFilesReceived()); + metrics.put(MetricNames.FLOW_FILES_QUEUED, (Gauge) () -> currentStatusReference.get().getQueuedCount()); + metrics.put(MetricNames.FLOW_FILES_SENT, (Gauge) () -> currentStatusReference.get().getFlowFilesSent()); + metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, (Gauge) () -> calculateProcessingNanos(currentStatusReference.get())); + + return metrics; + } + + /** + * Calculate the total processing time of a process group. + * + * @param status the current process group status. + * @return the total amount of nanoseconds spent in each processor in the process group. + */ + private long calculateProcessingNanos(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStats : status.getProcessorStatus()) { + nanos += procStats.getProcessingNanos(); + } + + for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + nanos += calculateProcessingNanos(childGroupStatus); + } + + return nanos; + } +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java new file mode 100644 index 0000000000..fa06b8b984 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics; + +/** + * The Metric names to send to Ambari. + */ +public interface MetricNames { + + // NiFi Metrics + String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; + String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; + String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; + String BYTES_SENT = "BytesSentLast5Minutes"; + String FLOW_FILES_QUEUED = "FlowFilesQueued"; + String BYTES_QUEUED = "BytesQueued"; + String BYTES_READ = "BytesReadLast5Minutes"; + String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; + String ACTIVE_THREADS = "ActiveThreads"; + String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java new file mode 100644 index 0000000000..55623ce52f --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.metrics.reporting.task.MetricsReportingTask; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A controller service that provides metric reporters for graphite, can be used by {@link MetricsReportingTask}. + * + * @author Omer Hadari + */ +@Tags({"metrics", "reporting", "graphite"}) +@CapabilityDescription("A controller service that provides metric reporters for graphite. " + + "Used by MetricsReportingTask.") +public class GraphiteMetricReporterService extends AbstractControllerService implements MetricReporterService { + + /** + * Points to the hostname of the graphite listener. + */ + public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host") + .description("The hostname of the carbon listener") + .required(true) + .addValidator(StandardValidators.URI_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * Points to the port on which the graphite server listens. + */ + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("port") + .displayName("Port") + .description("The port on which carbon listens") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * Points to the charset name that the graphite server expects. + */ + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("charset") + .displayName("Charset") + .description("The charset used by the graphite server") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + /** + * Prefix for all metric names sent by reporters - for separation of NiFi stats in graphite. + */ + protected static final PropertyDescriptor METRIC_NAME_PREFIX = new PropertyDescriptor.Builder() + .name("metric name prefix") + .displayName("Metric Name Prefix") + .description("A prefix that will be used for all metric names sent by reporters provided by this service.") + .required(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * List of property descriptors used by the service. + */ + private static final List properties; + + static { + final List props = new ArrayList<>(); + props.add(HOST); + props.add(PORT); + props.add(CHARSET); + props.add(METRIC_NAME_PREFIX); + properties = Collections.unmodifiableList(props); + } + + /** + * Graphite sender, a connection to the server. + */ + private GraphiteSender graphiteSender; + + /** + * The configured {@link #METRIC_NAME_PREFIX} value. + */ + private String metricNamePrefix; + + /** + * Create the {@link #graphiteSender} according to configuration. + * + * @param context used to access properties. + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue(); + int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + graphiteSender = createSender(host, port, charset); + metricNamePrefix = context.getProperty(METRIC_NAME_PREFIX).evaluateAttributeExpressions().getValue(); + } + + /** + * Close the graphite sender. + * + * @throws IOException if failed to close the connection. + */ + @OnDisabled + public void shutdown() throws IOException { + try { + graphiteSender.close(); + } finally { + graphiteSender = null; + } + } + + /** + * Use the {@link #graphiteSender} in order to create a reporter. + * + * @param metricRegistry registry with the metrics to report. + * @return a reporter instance. + */ + @Override + public ScheduledReporter createReporter(MetricRegistry metricRegistry) { + return GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricNamePrefix).build(graphiteSender); + + } + + /** + * Create a sender. + * + * @param host the hostname of the server to connect to. + * @param port the port on which the server listens. + * @param charset the charset in which the server expects logs. + * @return The created sender. + */ + protected GraphiteSender createSender(String host, int port, Charset charset) { + return new Graphite(host, port, SocketFactory.getDefault(), charset); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java new file mode 100644 index 0000000000..37eb194cc6 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.task; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.metrics.FlowMetricSet; +import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A reporting task for NiFi instance and JVM related metrics. + *

+ * This task reports metrics to services according to a provided {@link ScheduledReporter}, reached by using a + * {@link MetricReporterService}. In order to report to different clients, simply use different implementations of + * the controller service. + * + * @author Omer Hadari + * @see MetricReporterService + */ +@Tags({"metrics", "reporting"}) +@CapabilityDescription("This reporting task reports a set of metrics regarding the JVM and the NiFi instance" + + "to a reporter. The reporter is provided by a MetricReporterService. It can be optionally used for a specific" + + "process group if a property with the group id is provided.") +public class MetricsReportingTask extends AbstractReportingTask { + + /** + * Points to the service which provides {@link ScheduledReporter} instances. + */ + protected static final PropertyDescriptor REPORTER_SERVICE = new PropertyDescriptor.Builder() + .name("metric reporter service") + .displayName("Metric Reporter Service") + .description("The service that provides a reporter for the gathered metrics") + .identifiesControllerService(MetricReporterService.class) + .required(true) + .build(); + + /** + * Metrics of the process group with this ID should be reported. If not specified, use the root process group. + */ + protected static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder() + .name("process group id") + .displayName("Process Group ID") + .description("The id of the process group to report. If not specified, metrics of the root process group" + + "are reported.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Contains the metrics that should be reported. + */ + private MetricRegistry metricRegistry; + + /** + * Used for actually reporting metrics. + */ + private ScheduledReporter reporter; + + // Protected for testing sake. DO NOT ACCESS FOR OTHER PURPOSES. + /** + * Points to the most recent process group status seen by this task. + */ + protected AtomicReference currentStatusReference; + + /** + * Register all wanted metrics to {@link #metricRegistry}. + *

+ * {@inheritDoc} + */ + @Override + protected void init(ReportingInitializationContext config) { + metricRegistry = new MetricRegistry(); + currentStatusReference = new AtomicReference<>(); + metricRegistry.registerAll(new MemoryUsageGaugeSet()); + metricRegistry.registerAll(new FlowMetricSet(currentStatusReference)); + } + + /** + * Populate {@link #reporter} using the {@link MetricReporterService}. If the reporter is active already, + * do nothing. + * + * @param context used for accessing the controller service. + */ + @OnScheduled + public void connect(ConfigurationContext context) { + if (reporter == null) { + reporter = ((MetricReporterService) context.getProperty(REPORTER_SERVICE).asControllerService()) + .createReporter(metricRegistry); + } + } + + /** + * Report the registered metrics. + * + * @param context used for getting the most recent {@link ProcessGroupStatus}. + */ + @Override + public void onTrigger(ReportingContext context) { + String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue(); + + ProcessGroupStatus statusToReport = groupId == null + ? context.getEventAccess().getControllerStatus() + : context.getEventAccess().getGroupStatus(groupId); + + if (statusToReport != null) { + currentStatusReference.set(statusToReport); + reporter.report(); + } else { + getLogger().error("Process group with provided group id could not be found."); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + List properties = new ArrayList<>(); + properties.add(REPORTER_SERVICE); + properties.add(PROCESS_GROUP_ID); + return Collections.unmodifiableList(properties); + } +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..0ddc70ac8e --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.metrics.reporting.reporter.service.GraphiteMetricReporterService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 0000000000..8c554a4252 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.metrics.reporting.task.MetricsReportingTask diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java new file mode 100644 index 0000000000..e7257a08c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link GraphiteMetricReporterService}. + * + * @author Omer Hadari + */ +@RunWith(MockitoJUnitRunner.class) +public class GraphiteMetricReporterServiceTest { + + /** + * Service identifier for registerting the tested service to the tests runner. + */ + private static final String SERVICE_IDENTIFIER = "graphite-metric-reporter-service"; + + /** + * Sample host name for the {@link GraphiteMetricReporterService#HOST} property. + */ + private static final String TEST_HOST = "some-host"; + + /** + * Sample port for the {@link GraphiteMetricReporterService#PORT} property. + */ + private static final int TEST_PORT = 12345; + + /** + * Sample charset for the {@link GraphiteMetricReporterService#CHARSET} property. + */ + private static final Charset TEST_CHARSET = StandardCharsets.UTF_16LE; + + /** + * Sample prefix for metric names. + */ + private static final String METRIC_NAMES_PREFIX = "test-metric-name-prefix"; + + /** + * Sample metric for verifying that a graphite sender with the correct configuration is used. + */ + private static final String TEST_METRIC_NAME = "test-metric"; + + /** + * The fixed value of {@link #TEST_METRIC_NAME}. + */ + private static final int TEST_METRIC_VALUE = 2; + + /** + * Dummy processor for creating {@link #runner}. + */ + @Mock + private Processor processorDummy; + + /** + * Mock sender for verifying creation with the correct configuration. + */ + @Mock + private GraphiteSender graphiteSenderMock; + + /** + * Stub metric registry, that contains the test metrics. + */ + private MetricRegistry metricRegistryStub; + + /** + * Test runner for activating and configuring the service. + */ + private TestRunner runner; + + /** + * The test subject. + */ + private GraphiteMetricReporterService testedService; + + /** + * Instantiate the runner and mocks between tests. Register metrics to the {@link #metricRegistryStub}. + */ + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(processorDummy); + testedService = new GraphiteMetricReporterService(); + + metricRegistryStub = new MetricRegistry(); + metricRegistryStub.register(TEST_METRIC_NAME, ((Gauge) () -> TEST_METRIC_VALUE)); + + } + + + /** + * Make sure that a correctly configured service can be activated. + */ + @Test + public void testGraphiteMetricReporterSanityConfiguration() throws Exception { + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + runner.enableControllerService(testedService); + + runner.assertValid(testedService); + } + + + /** + * Make sure that a correctly configured service provides a reporter for the matching configuration, and + * actually reports to the correct address. + */ + @Test + public void testCreateReporterUsesCorrectSender() throws Exception { + testedService = new TestableGraphiteMetricReporterService(); + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + when(graphiteSenderMock.isConnected()).thenReturn(false); + runner.enableControllerService(testedService); + + ScheduledReporter createdReporter = testedService.createReporter(metricRegistryStub); + createdReporter.report(); + + String expectedMetricName = MetricRegistry.name(METRIC_NAMES_PREFIX, TEST_METRIC_NAME); + verify(graphiteSenderMock).send(eq(expectedMetricName), eq(String.valueOf(TEST_METRIC_VALUE)), anyLong()); + } + + /** + * Make sure that {@link GraphiteMetricReporterService#shutdown()} closes the connection to graphite. + */ + @Test + public void testShutdownClosesSender() throws Exception { + testedService = new TestableGraphiteMetricReporterService(); + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + runner.enableControllerService(testedService); + runner.disableControllerService(testedService); + + verify(graphiteSenderMock).close(); + } + + /** + * Set the test subject's properties. + * + * @param host populates {@link GraphiteMetricReporterService#HOST}. + * @param port populates {@link GraphiteMetricReporterService#PORT}. + * @param charset populates {@link GraphiteMetricReporterService#CHARSET}. + * @param metricNamesPrefix populates {@link GraphiteMetricReporterService#METRIC_NAME_PREFIX}. + */ + private void setServiceProperties(String host, int port, Charset charset, String metricNamesPrefix) { + runner.setProperty(testedService, GraphiteMetricReporterService.HOST, host); + runner.setProperty(testedService, GraphiteMetricReporterService.PORT, String.valueOf(port)); + runner.setProperty(testedService, GraphiteMetricReporterService.CHARSET, charset.name()); + runner.setProperty(testedService, GraphiteMetricReporterService.METRIC_NAME_PREFIX, metricNamesPrefix); + } + + /** + * This class is a patch. It overrides {@link GraphiteMetricReporterService#createSender(String, int, Charset)} + * so that it is possible to verify a correct creation of graphite senders according to property values. + */ + private class TestableGraphiteMetricReporterService extends GraphiteMetricReporterService { + + /** + * Overrides the actual methods in order to inject the mock {@link #graphiteSenderMock}. + *

+ * If this method is called with the test property values, it returns the mock. Otherwise operate + * regularly. + * + * @param host the provided hostname. + * @param port the provided port. + * @param charset the provided graphite server charset. + * @return {@link #graphiteSenderMock} if all params were the constant test params, regular result otherwise. + */ + @Override + protected GraphiteSender createSender(String host, int port, Charset charset) { + if (TEST_HOST.equals(host) && TEST_PORT == port && TEST_CHARSET.equals(charset)) { + return graphiteSenderMock; + + } + return super.createSender(host, port, charset); + } + } +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java new file mode 100644 index 0000000000..0619e739e3 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.task; + +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.metrics.FlowMetricSet; +import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockReportingContext; +import org.apache.nifi.util.MockReportingInitializationContext; +import org.apache.nifi.util.MockVariableRegistry; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link MetricsReportingTask}. + * + * @author Omer Hadari + */ +@RunWith(MockitoJUnitRunner.class) +public class MetricsReportingTaskTest { + + /** + * Identifier for {@link #reporterServiceStub}. + */ + private static final String REPORTER_SERVICE_IDENTIFIER = "reporter-service"; + + /** + * Id for the group with status {@link #innerGroupStatus}. + */ + private static final String TEST_GROUP_ID = "test-process-group-id"; + + /** + * Id for the {@link #reportingInitContextStub}. + */ + private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id"; + + /** + * Name for {@link #reportingInitContextStub}. + */ + private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name"; + + /** + * Id for the tested tested reporting task. + */ + private static final String TEST_TASK_ID = "test-task-id"; + + + /** + * Stub context, used by {@link MetricsReportingTask#onTrigger(ReportingContext)} for reaching the status. + */ + private MockReportingContext reportingContextStub; + + /** + * Stub context, used by {@link MetricsReportingTask#connect(ConfigurationContext)} for reaching the service. + */ + private MockConfigurationContext configurationContextStub; + + /** + * Stub service for providing {@link #reporterMock}, used for actual reporting + */ + @Mock + private MetricReporterService reporterServiceStub; + + /** + * Mock reporter, used for verifying actual reporting. + */ + @Mock + private ScheduledReporter reporterMock; + + /** + * A status for the "root" process group. + */ + private ProcessGroupStatus rootGroupStatus; + + /** + * Same as {@link #rootGroupStatus}, used when {@link MetricsReportingTask#PROCESS_GROUP_ID} is set. + */ + private ProcessGroupStatus innerGroupStatus; + + /** + * Stub initialization context for calling {@link MetricsReportingTask#initialize(ReportingInitializationContext)}. + */ + private MockReportingInitializationContext reportingInitContextStub; + + /** + * The test subject. + */ + private MetricsReportingTask testedReportingTask; + + /** + * Set up the test environment and mock behaviour. This includes registering {@link #reporterServiceStub} in the + * different contexts, overriding {@link MetricsReportingTask#currentStatusReference} and instantiating the test + * subject. + */ + @Before + public void setUp() throws Exception { + Map services = new HashMap<>(); + services.put(REPORTER_SERVICE_IDENTIFIER, reporterServiceStub); + testedReportingTask = new MetricsReportingTask(); + reportingContextStub = new MockReportingContext( + services, new MockStateManager(testedReportingTask), new MockVariableRegistry()); + + rootGroupStatus = new ProcessGroupStatus(); + innerGroupStatus = new ProcessGroupStatus(); + when(reporterServiceStub.createReporter(any())).thenReturn(reporterMock); + when(reporterServiceStub.getIdentifier()).thenReturn(REPORTER_SERVICE_IDENTIFIER); + reportingContextStub.setProperty(MetricsReportingTask.REPORTER_SERVICE.getName(), REPORTER_SERVICE_IDENTIFIER); + reportingContextStub.addControllerService(reporterServiceStub, REPORTER_SERVICE_IDENTIFIER); + + configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(), + reportingContextStub.getControllerServiceLookup()); + reportingInitContextStub = new MockReportingInitializationContext( + TEST_INIT_CONTEXT_ID, + TEST_INIT_CONTEXT_NAME, + new MockComponentLog(TEST_TASK_ID, testedReportingTask)); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testValidLifeCycleReportsCorrectly() throws Exception { + reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock).report(); + + // Verify correct metrics are registered + ArgumentCaptor registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class); + verify(reporterServiceStub).createReporter(registryCaptor.capture()); + MetricRegistry usedRegistry = registryCaptor.getValue(); + Map usedMetrics = usedRegistry.getMetrics(); + assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet())); + assertTrue(usedMetrics.keySet() + .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet())); + + // Verify the most current ProcessGroupStatus is updated + assertEquals(testedReportingTask.currentStatusReference.get(), rootGroupStatus); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testValidLifeCycleReportsCorrectlyProcessGroupSpecified() throws Exception { + reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID); + reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock).report(); + + // Verify correct metrics are registered + ArgumentCaptor registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class); + verify(reporterServiceStub).createReporter(registryCaptor.capture()); + MetricRegistry usedRegistry = registryCaptor.getValue(); + Map usedMetrics = usedRegistry.getMetrics(); + assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet())); + assertTrue(usedMetrics.keySet() + .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet())); + + // Verify the most current ProcessGroupStatus is updated + assertEquals(testedReportingTask.currentStatusReference.get(), innerGroupStatus); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testInvalidProcessGroupId() throws Exception { + reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID + "-invalid"); + reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock, never()).report(); + assertNull(testedReportingTask.currentStatusReference.get()); + } + + /** + * Make sure that {@link MetricsReportingTask#connect(ConfigurationContext)} does not create a new reporter + * if there is already an active reporter. + */ + @Test + public void testConnectCreatesSingleReporter() throws Exception { + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.connect(configurationContextStub); + + verify(reporterServiceStub, times(1)).createReporter(any()); + } + + /** + * Sanity check for registered properties. + */ + @Test + public void testGetSupportedPropertyDescriptorsSanity() throws Exception { + List expected = Arrays.asList( + MetricsReportingTask.REPORTER_SERVICE, + MetricsReportingTask.PROCESS_GROUP_ID); + assertEquals(expected, testedReportingTask.getSupportedPropertyDescriptors()); + } +} diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml new file mode 100644 index 0000000000..2006ebfc24 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml @@ -0,0 +1,57 @@ + + + + + nifi-nar-bundles + org.apache.nifi + 1.5.0-SNAPSHOT + + 4.0.0 + + nifi-metrics-reporting-bundle + pom + + nifi-metrics-reporting-task + nifi-metrics-reporting-nar + nifi-metrics-reporter-service-api + nifi-metrics-reporter-service-api-nar + + + + + + org.apache.nifi + nifi-metrics-reporting-task + 1.5.0-SNAPSHOT + + + + + + + io.dropwizard.metrics + metrics-core + 3.1.2 + + + org.apache.nifi + nifi-api + provided + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 0771c62033..330dabaab7 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -88,6 +88,7 @@ nifi-extension-utils nifi-grpc-bundle nifi-redis-bundle + nifi-metrics-reporting-bundle diff --git a/pom.xml b/pom.xml index 56961ec2a8..ab4686164e 100644 --- a/pom.xml +++ b/pom.xml @@ -1488,6 +1488,18 @@ nifi-redis-nar 1.5.0-SNAPSHOT nar + + + org.apache.nifi + nifi-metrics-reporter-service-api-nar + 1.5.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-metrics-reporting-nar + 1.5.0-SNAPSHOT + nar org.apache.nifi