mirror of https://github.com/apache/nifi.git
NIFI-4392 Create a MetricReportingTask with GraphiteMetricService
This closes #2171. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
5930c0c212
commit
6201c06c99
|
@ -516,6 +516,16 @@
|
|||
<artifactId>nifi-redis-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporting-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
|
|
@ -18,7 +18,9 @@ package org.apache.nifi.util;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.action.Action;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
|
@ -31,10 +33,14 @@ public class MockEventAccess implements EventAccess {
|
|||
private ProcessGroupStatus processGroupStatus;
|
||||
private final List<ProvenanceEventRecord> provenanceRecords = new ArrayList<>();
|
||||
private final List<Action> flowChanges = new ArrayList<>();
|
||||
private final Map<String, ProcessGroupStatus> processGroupStatusMap = new HashMap<>();
|
||||
|
||||
public void setProcessGroupStatus(final ProcessGroupStatus status) {
|
||||
this.processGroupStatus = status;
|
||||
}
|
||||
public void setProcessGroupStatus(String groupId, final ProcessGroupStatus status) {
|
||||
processGroupStatusMap.put(groupId, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessGroupStatus getControllerStatus() {
|
||||
|
@ -43,7 +49,7 @@ public class MockEventAccess implements EventAccess {
|
|||
|
||||
@Override
|
||||
public ProcessGroupStatus getGroupStatus(final String groupId) {
|
||||
return null;
|
||||
return processGroupStatusMap.get(groupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-metrics-reporting-bundle</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<packaging>nar</packaging>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporter-service-api</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,25 @@
|
|||
nifi-metrics-reporter-service-api-nar
|
||||
Copyright 2015-2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
******************
|
||||
Apache Software License v2
|
||||
******************
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Dropwizard Metrics
|
||||
The following NOTICE information applies:
|
||||
Metrics
|
||||
Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
|
||||
|
||||
This product includes software developed by Coda Hale and Yammer, Inc.
|
||||
|
||||
This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
|
||||
LongAdder), which was released with the following comments:
|
||||
|
||||
Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
Expert Group and released to the public domain, as explained at
|
||||
http://creativecommons.org/publicdomain/zero/1.0/
|
|
@ -0,0 +1,27 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-metrics-reporting-bundle</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-metrics-reporter-service-api</artifactId>
|
||||
</project>
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics.reporting.reporter.service;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
/**
|
||||
* An interface for controller services used by MetricsReportingTask. In order to report to a new
|
||||
* client, implement this interface and make sure to return the desired implementation of {@link ScheduledReporter}.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
*/
|
||||
public interface MetricReporterService extends ControllerService {
|
||||
|
||||
/**
|
||||
* Create a reporter to a metric client (i.e. graphite).
|
||||
*
|
||||
* @param metricRegistry registry with the metrics to report.
|
||||
* @return an instance of the reporter.
|
||||
* @throws ProcessException if there was an error creating the reporter.
|
||||
*/
|
||||
ScheduledReporter createReporter(MetricRegistry metricRegistry) throws ProcessException;
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-metrics-reporting-bundle</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<packaging>nar</packaging>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-metrics-reporting-nar</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporting-task</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,25 @@
|
|||
nifi-metrics-reporting-nar
|
||||
Copyright 2015-2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
******************
|
||||
Apache Software License v2
|
||||
******************
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Dropwizard Metrics
|
||||
The following NOTICE information applies:
|
||||
Metrics
|
||||
Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team
|
||||
|
||||
This product includes software developed by Coda Hale and Yammer, Inc.
|
||||
|
||||
This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64,
|
||||
LongAdder), which was released with the following comments:
|
||||
|
||||
Written by Doug Lea with assistance from members of JCP JSR-166
|
||||
Expert Group and released to the public domain, as explained at
|
||||
http://creativecommons.org/publicdomain/zero/1.0/
|
|
@ -0,0 +1,55 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-metrics-reporting-bundle</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-metrics-reporting-task</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporter-service-api</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-graphite</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-jvm</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.Metric;
|
||||
import com.codahale.metrics.MetricSet;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* A metric set of NiFi instance related metrics.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
*/
|
||||
public class FlowMetricSet implements MetricSet {
|
||||
|
||||
|
||||
/**
|
||||
* Reference to the process status that should be reported. Should be updated when the status changes.
|
||||
*/
|
||||
private final AtomicReference<ProcessGroupStatus> currentStatusReference;
|
||||
|
||||
/**
|
||||
* Create a metric set that will look at a given process status reference for deciding metrics.
|
||||
*
|
||||
* @param currentStatusReference a reference to the process status.
|
||||
*/
|
||||
public FlowMetricSet(AtomicReference<ProcessGroupStatus> currentStatusReference) {
|
||||
this.currentStatusReference = currentStatusReference;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a map of {@link Gauge}s for the {@link #currentStatusReference}. This methods reports the metrics as
|
||||
* found in the reference.
|
||||
*
|
||||
* @return map between the metric name and a {@link Gauge} to it's value.
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Metric> getMetrics() {
|
||||
|
||||
Map<String, Metric> metrics = new HashMap<>();
|
||||
|
||||
metrics.put(MetricNames.ACTIVE_THREADS, (Gauge<Integer>) () -> currentStatusReference.get().getActiveThreadCount());
|
||||
metrics.put(MetricNames.BYTES_QUEUED, (Gauge<Long>) () -> currentStatusReference.get().getQueuedContentSize());
|
||||
metrics.put(MetricNames.BYTES_READ, (Gauge<Long>) () -> currentStatusReference.get().getBytesRead());
|
||||
metrics.put(MetricNames.BYTES_RECEIVED, (Gauge<Long>) () -> currentStatusReference.get().getBytesReceived());
|
||||
metrics.put(MetricNames.BYTES_SENT, (Gauge<Long>) () -> currentStatusReference.get().getBytesSent());
|
||||
metrics.put(MetricNames.BYTES_WRITTEN, (Gauge<Long>) () -> currentStatusReference.get().getBytesWritten());
|
||||
metrics.put(MetricNames.FLOW_FILES_RECEIVED, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesReceived());
|
||||
metrics.put(MetricNames.FLOW_FILES_QUEUED, (Gauge<Integer>) () -> currentStatusReference.get().getQueuedCount());
|
||||
metrics.put(MetricNames.FLOW_FILES_SENT, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesSent());
|
||||
metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, (Gauge<Long>) () -> calculateProcessingNanos(currentStatusReference.get()));
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the total processing time of a process group.
|
||||
*
|
||||
* @param status the current process group status.
|
||||
* @return the total amount of nanoseconds spent in each processor in the process group.
|
||||
*/
|
||||
private long calculateProcessingNanos(final ProcessGroupStatus status) {
|
||||
long nanos = 0L;
|
||||
|
||||
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
|
||||
nanos += procStats.getProcessingNanos();
|
||||
}
|
||||
|
||||
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
|
||||
nanos += calculateProcessingNanos(childGroupStatus);
|
||||
}
|
||||
|
||||
return nanos;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics;
|
||||
|
||||
/**
|
||||
* The Metric names to send to Ambari.
|
||||
*/
|
||||
public interface MetricNames {
|
||||
|
||||
// NiFi Metrics
|
||||
String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
|
||||
String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
|
||||
String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
|
||||
String BYTES_SENT = "BytesSentLast5Minutes";
|
||||
String FLOW_FILES_QUEUED = "FlowFilesQueued";
|
||||
String BYTES_QUEUED = "BytesQueued";
|
||||
String BYTES_READ = "BytesReadLast5Minutes";
|
||||
String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
|
||||
String ACTIVE_THREADS = "ActiveThreads";
|
||||
String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics.reporting.reporter.service;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.codahale.metrics.graphite.Graphite;
|
||||
import com.codahale.metrics.graphite.GraphiteReporter;
|
||||
import com.codahale.metrics.graphite.GraphiteSender;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.metrics.reporting.task.MetricsReportingTask;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A controller service that provides metric reporters for graphite, can be used by {@link MetricsReportingTask}.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
*/
|
||||
@Tags({"metrics", "reporting", "graphite"})
|
||||
@CapabilityDescription("A controller service that provides metric reporters for graphite. " +
|
||||
"Used by MetricsReportingTask.")
|
||||
public class GraphiteMetricReporterService extends AbstractControllerService implements MetricReporterService {
|
||||
|
||||
/**
|
||||
* Points to the hostname of the graphite listener.
|
||||
*/
|
||||
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
|
||||
.name("host")
|
||||
.displayName("Host")
|
||||
.description("The hostname of the carbon listener")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.URI_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Points to the port on which the graphite server listens.
|
||||
*/
|
||||
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
||||
.name("port")
|
||||
.displayName("Port")
|
||||
.description("The port on which carbon listens")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Points to the charset name that the graphite server expects.
|
||||
*/
|
||||
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||
.name("charset")
|
||||
.displayName("Charset")
|
||||
.description("The charset used by the graphite server")
|
||||
.required(true)
|
||||
.defaultValue("UTF-8")
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Prefix for all metric names sent by reporters - for separation of NiFi stats in graphite.
|
||||
*/
|
||||
protected static final PropertyDescriptor METRIC_NAME_PREFIX = new PropertyDescriptor.Builder()
|
||||
.name("metric name prefix")
|
||||
.displayName("Metric Name Prefix")
|
||||
.description("A prefix that will be used for all metric names sent by reporters provided by this service.")
|
||||
.required(true)
|
||||
.defaultValue("nifi")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* List of property descriptors used by the service.
|
||||
*/
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(HOST);
|
||||
props.add(PORT);
|
||||
props.add(CHARSET);
|
||||
props.add(METRIC_NAME_PREFIX);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
/**
|
||||
* Graphite sender, a connection to the server.
|
||||
*/
|
||||
private GraphiteSender graphiteSender;
|
||||
|
||||
/**
|
||||
* The configured {@link #METRIC_NAME_PREFIX} value.
|
||||
*/
|
||||
private String metricNamePrefix;
|
||||
|
||||
/**
|
||||
* Create the {@link #graphiteSender} according to configuration.
|
||||
*
|
||||
* @param context used to access properties.
|
||||
*/
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue();
|
||||
int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
|
||||
Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
graphiteSender = createSender(host, port, charset);
|
||||
metricNamePrefix = context.getProperty(METRIC_NAME_PREFIX).evaluateAttributeExpressions().getValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the graphite sender.
|
||||
*
|
||||
* @throws IOException if failed to close the connection.
|
||||
*/
|
||||
@OnDisabled
|
||||
public void shutdown() throws IOException {
|
||||
try {
|
||||
graphiteSender.close();
|
||||
} finally {
|
||||
graphiteSender = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use the {@link #graphiteSender} in order to create a reporter.
|
||||
*
|
||||
* @param metricRegistry registry with the metrics to report.
|
||||
* @return a reporter instance.
|
||||
*/
|
||||
@Override
|
||||
public ScheduledReporter createReporter(MetricRegistry metricRegistry) {
|
||||
return GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricNamePrefix).build(graphiteSender);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a sender.
|
||||
*
|
||||
* @param host the hostname of the server to connect to.
|
||||
* @param port the port on which the server listens.
|
||||
* @param charset the charset in which the server expects logs.
|
||||
* @return The created sender.
|
||||
*/
|
||||
protected GraphiteSender createSender(String host, int port, Charset charset) {
|
||||
return new Graphite(host, port, SocketFactory.getDefault(), charset);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics.reporting.task;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.metrics.FlowMetricSet;
|
||||
import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.AbstractReportingTask;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.ReportingInitializationContext;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* A reporting task for NiFi instance and JVM related metrics.
|
||||
* <p>
|
||||
* This task reports metrics to services according to a provided {@link ScheduledReporter}, reached by using a
|
||||
* {@link MetricReporterService}. In order to report to different clients, simply use different implementations of
|
||||
* the controller service.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
* @see MetricReporterService
|
||||
*/
|
||||
@Tags({"metrics", "reporting"})
|
||||
@CapabilityDescription("This reporting task reports a set of metrics regarding the JVM and the NiFi instance" +
|
||||
"to a reporter. The reporter is provided by a MetricReporterService. It can be optionally used for a specific" +
|
||||
"process group if a property with the group id is provided.")
|
||||
public class MetricsReportingTask extends AbstractReportingTask {
|
||||
|
||||
/**
|
||||
* Points to the service which provides {@link ScheduledReporter} instances.
|
||||
*/
|
||||
protected static final PropertyDescriptor REPORTER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("metric reporter service")
|
||||
.displayName("Metric Reporter Service")
|
||||
.description("The service that provides a reporter for the gathered metrics")
|
||||
.identifiesControllerService(MetricReporterService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Metrics of the process group with this ID should be reported. If not specified, use the root process group.
|
||||
*/
|
||||
protected static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
|
||||
.name("process group id")
|
||||
.displayName("Process Group ID")
|
||||
.description("The id of the process group to report. If not specified, metrics of the root process group" +
|
||||
"are reported.")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Contains the metrics that should be reported.
|
||||
*/
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
/**
|
||||
* Used for actually reporting metrics.
|
||||
*/
|
||||
private ScheduledReporter reporter;
|
||||
|
||||
// Protected for testing sake. DO NOT ACCESS FOR OTHER PURPOSES.
|
||||
/**
|
||||
* Points to the most recent process group status seen by this task.
|
||||
*/
|
||||
protected AtomicReference<ProcessGroupStatus> currentStatusReference;
|
||||
|
||||
/**
|
||||
* Register all wanted metrics to {@link #metricRegistry}.
|
||||
* <p>
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected void init(ReportingInitializationContext config) {
|
||||
metricRegistry = new MetricRegistry();
|
||||
currentStatusReference = new AtomicReference<>();
|
||||
metricRegistry.registerAll(new MemoryUsageGaugeSet());
|
||||
metricRegistry.registerAll(new FlowMetricSet(currentStatusReference));
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate {@link #reporter} using the {@link MetricReporterService}. If the reporter is active already,
|
||||
* do nothing.
|
||||
*
|
||||
* @param context used for accessing the controller service.
|
||||
*/
|
||||
@OnScheduled
|
||||
public void connect(ConfigurationContext context) {
|
||||
if (reporter == null) {
|
||||
reporter = ((MetricReporterService) context.getProperty(REPORTER_SERVICE).asControllerService())
|
||||
.createReporter(metricRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report the registered metrics.
|
||||
*
|
||||
* @param context used for getting the most recent {@link ProcessGroupStatus}.
|
||||
*/
|
||||
@Override
|
||||
public void onTrigger(ReportingContext context) {
|
||||
String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue();
|
||||
|
||||
ProcessGroupStatus statusToReport = groupId == null
|
||||
? context.getEventAccess().getControllerStatus()
|
||||
: context.getEventAccess().getGroupStatus(groupId);
|
||||
|
||||
if (statusToReport != null) {
|
||||
currentStatusReference.set(statusToReport);
|
||||
reporter.report();
|
||||
} else {
|
||||
getLogger().error("Process group with provided group id could not be found.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(REPORTER_SERVICE);
|
||||
properties.add(PROCESS_GROUP_ID);
|
||||
return Collections.unmodifiableList(properties);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.metrics.reporting.reporter.service.GraphiteMetricReporterService
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.metrics.reporting.task.MetricsReportingTask
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics.reporting.reporter.service;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.codahale.metrics.graphite.GraphiteSender;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test class for {@link GraphiteMetricReporterService}.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class GraphiteMetricReporterServiceTest {
|
||||
|
||||
/**
|
||||
* Service identifier for registerting the tested service to the tests runner.
|
||||
*/
|
||||
private static final String SERVICE_IDENTIFIER = "graphite-metric-reporter-service";
|
||||
|
||||
/**
|
||||
* Sample host name for the {@link GraphiteMetricReporterService#HOST} property.
|
||||
*/
|
||||
private static final String TEST_HOST = "some-host";
|
||||
|
||||
/**
|
||||
* Sample port for the {@link GraphiteMetricReporterService#PORT} property.
|
||||
*/
|
||||
private static final int TEST_PORT = 12345;
|
||||
|
||||
/**
|
||||
* Sample charset for the {@link GraphiteMetricReporterService#CHARSET} property.
|
||||
*/
|
||||
private static final Charset TEST_CHARSET = StandardCharsets.UTF_16LE;
|
||||
|
||||
/**
|
||||
* Sample prefix for metric names.
|
||||
*/
|
||||
private static final String METRIC_NAMES_PREFIX = "test-metric-name-prefix";
|
||||
|
||||
/**
|
||||
* Sample metric for verifying that a graphite sender with the correct configuration is used.
|
||||
*/
|
||||
private static final String TEST_METRIC_NAME = "test-metric";
|
||||
|
||||
/**
|
||||
* The fixed value of {@link #TEST_METRIC_NAME}.
|
||||
*/
|
||||
private static final int TEST_METRIC_VALUE = 2;
|
||||
|
||||
/**
|
||||
* Dummy processor for creating {@link #runner}.
|
||||
*/
|
||||
@Mock
|
||||
private Processor processorDummy;
|
||||
|
||||
/**
|
||||
* Mock sender for verifying creation with the correct configuration.
|
||||
*/
|
||||
@Mock
|
||||
private GraphiteSender graphiteSenderMock;
|
||||
|
||||
/**
|
||||
* Stub metric registry, that contains the test metrics.
|
||||
*/
|
||||
private MetricRegistry metricRegistryStub;
|
||||
|
||||
/**
|
||||
* Test runner for activating and configuring the service.
|
||||
*/
|
||||
private TestRunner runner;
|
||||
|
||||
/**
|
||||
* The test subject.
|
||||
*/
|
||||
private GraphiteMetricReporterService testedService;
|
||||
|
||||
/**
|
||||
* Instantiate the runner and mocks between tests. Register metrics to the {@link #metricRegistryStub}.
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
runner = TestRunners.newTestRunner(processorDummy);
|
||||
testedService = new GraphiteMetricReporterService();
|
||||
|
||||
metricRegistryStub = new MetricRegistry();
|
||||
metricRegistryStub.register(TEST_METRIC_NAME, ((Gauge<Integer>) () -> TEST_METRIC_VALUE));
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make sure that a correctly configured service can be activated.
|
||||
*/
|
||||
@Test
|
||||
public void testGraphiteMetricReporterSanityConfiguration() throws Exception {
|
||||
runner.addControllerService(SERVICE_IDENTIFIER, testedService);
|
||||
setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
|
||||
runner.enableControllerService(testedService);
|
||||
|
||||
runner.assertValid(testedService);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make sure that a correctly configured service provides a reporter for the matching configuration, and
|
||||
* actually reports to the correct address.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateReporterUsesCorrectSender() throws Exception {
|
||||
testedService = new TestableGraphiteMetricReporterService();
|
||||
runner.addControllerService(SERVICE_IDENTIFIER, testedService);
|
||||
setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
|
||||
when(graphiteSenderMock.isConnected()).thenReturn(false);
|
||||
runner.enableControllerService(testedService);
|
||||
|
||||
ScheduledReporter createdReporter = testedService.createReporter(metricRegistryStub);
|
||||
createdReporter.report();
|
||||
|
||||
String expectedMetricName = MetricRegistry.name(METRIC_NAMES_PREFIX, TEST_METRIC_NAME);
|
||||
verify(graphiteSenderMock).send(eq(expectedMetricName), eq(String.valueOf(TEST_METRIC_VALUE)), anyLong());
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that {@link GraphiteMetricReporterService#shutdown()} closes the connection to graphite.
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownClosesSender() throws Exception {
|
||||
testedService = new TestableGraphiteMetricReporterService();
|
||||
runner.addControllerService(SERVICE_IDENTIFIER, testedService);
|
||||
setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX);
|
||||
runner.enableControllerService(testedService);
|
||||
runner.disableControllerService(testedService);
|
||||
|
||||
verify(graphiteSenderMock).close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the test subject's properties.
|
||||
*
|
||||
* @param host populates {@link GraphiteMetricReporterService#HOST}.
|
||||
* @param port populates {@link GraphiteMetricReporterService#PORT}.
|
||||
* @param charset populates {@link GraphiteMetricReporterService#CHARSET}.
|
||||
* @param metricNamesPrefix populates {@link GraphiteMetricReporterService#METRIC_NAME_PREFIX}.
|
||||
*/
|
||||
private void setServiceProperties(String host, int port, Charset charset, String metricNamesPrefix) {
|
||||
runner.setProperty(testedService, GraphiteMetricReporterService.HOST, host);
|
||||
runner.setProperty(testedService, GraphiteMetricReporterService.PORT, String.valueOf(port));
|
||||
runner.setProperty(testedService, GraphiteMetricReporterService.CHARSET, charset.name());
|
||||
runner.setProperty(testedService, GraphiteMetricReporterService.METRIC_NAME_PREFIX, metricNamesPrefix);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is a patch. It overrides {@link GraphiteMetricReporterService#createSender(String, int, Charset)}
|
||||
* so that it is possible to verify a correct creation of graphite senders according to property values.
|
||||
*/
|
||||
private class TestableGraphiteMetricReporterService extends GraphiteMetricReporterService {
|
||||
|
||||
/**
|
||||
* Overrides the actual methods in order to inject the mock {@link #graphiteSenderMock}.
|
||||
* <p>
|
||||
* If this method is called with the test property values, it returns the mock. Otherwise operate
|
||||
* regularly.
|
||||
*
|
||||
* @param host the provided hostname.
|
||||
* @param port the provided port.
|
||||
* @param charset the provided graphite server charset.
|
||||
* @return {@link #graphiteSenderMock} if all params were the constant test params, regular result otherwise.
|
||||
*/
|
||||
@Override
|
||||
protected GraphiteSender createSender(String host, int port, Charset charset) {
|
||||
if (TEST_HOST.equals(host) && TEST_PORT == port && TEST_CHARSET.equals(charset)) {
|
||||
return graphiteSenderMock;
|
||||
|
||||
}
|
||||
return super.createSender(host, port, charset);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,254 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.metrics.reporting.task;
|
||||
|
||||
import com.codahale.metrics.Metric;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.metrics.FlowMetricSet;
|
||||
import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.ReportingInitializationContext;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockReportingContext;
|
||||
import org.apache.nifi.util.MockReportingInitializationContext;
|
||||
import org.apache.nifi.util.MockVariableRegistry;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test class for {@link MetricsReportingTask}.
|
||||
*
|
||||
* @author Omer Hadari
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class MetricsReportingTaskTest {
|
||||
|
||||
/**
|
||||
* Identifier for {@link #reporterServiceStub}.
|
||||
*/
|
||||
private static final String REPORTER_SERVICE_IDENTIFIER = "reporter-service";
|
||||
|
||||
/**
|
||||
* Id for the group with status {@link #innerGroupStatus}.
|
||||
*/
|
||||
private static final String TEST_GROUP_ID = "test-process-group-id";
|
||||
|
||||
/**
|
||||
* Id for the {@link #reportingInitContextStub}.
|
||||
*/
|
||||
private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
|
||||
|
||||
/**
|
||||
* Name for {@link #reportingInitContextStub}.
|
||||
*/
|
||||
private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
|
||||
|
||||
/**
|
||||
* Id for the tested tested reporting task.
|
||||
*/
|
||||
private static final String TEST_TASK_ID = "test-task-id";
|
||||
|
||||
|
||||
/**
|
||||
* Stub context, used by {@link MetricsReportingTask#onTrigger(ReportingContext)} for reaching the status.
|
||||
*/
|
||||
private MockReportingContext reportingContextStub;
|
||||
|
||||
/**
|
||||
* Stub context, used by {@link MetricsReportingTask#connect(ConfigurationContext)} for reaching the service.
|
||||
*/
|
||||
private MockConfigurationContext configurationContextStub;
|
||||
|
||||
/**
|
||||
* Stub service for providing {@link #reporterMock}, used for actual reporting
|
||||
*/
|
||||
@Mock
|
||||
private MetricReporterService reporterServiceStub;
|
||||
|
||||
/**
|
||||
* Mock reporter, used for verifying actual reporting.
|
||||
*/
|
||||
@Mock
|
||||
private ScheduledReporter reporterMock;
|
||||
|
||||
/**
|
||||
* A status for the "root" process group.
|
||||
*/
|
||||
private ProcessGroupStatus rootGroupStatus;
|
||||
|
||||
/**
|
||||
* Same as {@link #rootGroupStatus}, used when {@link MetricsReportingTask#PROCESS_GROUP_ID} is set.
|
||||
*/
|
||||
private ProcessGroupStatus innerGroupStatus;
|
||||
|
||||
/**
|
||||
* Stub initialization context for calling {@link MetricsReportingTask#initialize(ReportingInitializationContext)}.
|
||||
*/
|
||||
private MockReportingInitializationContext reportingInitContextStub;
|
||||
|
||||
/**
|
||||
* The test subject.
|
||||
*/
|
||||
private MetricsReportingTask testedReportingTask;
|
||||
|
||||
/**
|
||||
* Set up the test environment and mock behaviour. This includes registering {@link #reporterServiceStub} in the
|
||||
* different contexts, overriding {@link MetricsReportingTask#currentStatusReference} and instantiating the test
|
||||
* subject.
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Map<String, ControllerService> services = new HashMap<>();
|
||||
services.put(REPORTER_SERVICE_IDENTIFIER, reporterServiceStub);
|
||||
testedReportingTask = new MetricsReportingTask();
|
||||
reportingContextStub = new MockReportingContext(
|
||||
services, new MockStateManager(testedReportingTask), new MockVariableRegistry());
|
||||
|
||||
rootGroupStatus = new ProcessGroupStatus();
|
||||
innerGroupStatus = new ProcessGroupStatus();
|
||||
when(reporterServiceStub.createReporter(any())).thenReturn(reporterMock);
|
||||
when(reporterServiceStub.getIdentifier()).thenReturn(REPORTER_SERVICE_IDENTIFIER);
|
||||
reportingContextStub.setProperty(MetricsReportingTask.REPORTER_SERVICE.getName(), REPORTER_SERVICE_IDENTIFIER);
|
||||
reportingContextStub.addControllerService(reporterServiceStub, REPORTER_SERVICE_IDENTIFIER);
|
||||
|
||||
configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(),
|
||||
reportingContextStub.getControllerServiceLookup());
|
||||
reportingInitContextStub = new MockReportingInitializationContext(
|
||||
TEST_INIT_CONTEXT_ID,
|
||||
TEST_INIT_CONTEXT_NAME,
|
||||
new MockComponentLog(TEST_TASK_ID, testedReportingTask));
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
|
||||
* is used and that metrics are actually reported.
|
||||
*/
|
||||
@Test
|
||||
public void testValidLifeCycleReportsCorrectly() throws Exception {
|
||||
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
|
||||
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.connect(configurationContextStub);
|
||||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
verify(reporterMock).report();
|
||||
|
||||
// Verify correct metrics are registered
|
||||
ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class);
|
||||
verify(reporterServiceStub).createReporter(registryCaptor.capture());
|
||||
MetricRegistry usedRegistry = registryCaptor.getValue();
|
||||
Map<String, Metric> usedMetrics = usedRegistry.getMetrics();
|
||||
assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet()));
|
||||
assertTrue(usedMetrics.keySet()
|
||||
.containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet()));
|
||||
|
||||
// Verify the most current ProcessGroupStatus is updated
|
||||
assertEquals(testedReportingTask.currentStatusReference.get(), rootGroupStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
|
||||
* is used and that metrics are actually reported.
|
||||
*/
|
||||
@Test
|
||||
public void testValidLifeCycleReportsCorrectlyProcessGroupSpecified() throws Exception {
|
||||
reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID);
|
||||
reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus);
|
||||
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.connect(configurationContextStub);
|
||||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
verify(reporterMock).report();
|
||||
|
||||
// Verify correct metrics are registered
|
||||
ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class);
|
||||
verify(reporterServiceStub).createReporter(registryCaptor.capture());
|
||||
MetricRegistry usedRegistry = registryCaptor.getValue();
|
||||
Map<String, Metric> usedMetrics = usedRegistry.getMetrics();
|
||||
assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet()));
|
||||
assertTrue(usedMetrics.keySet()
|
||||
.containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet()));
|
||||
|
||||
// Verify the most current ProcessGroupStatus is updated
|
||||
assertEquals(testedReportingTask.currentStatusReference.get(), innerGroupStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus}
|
||||
* is used and that metrics are actually reported.
|
||||
*/
|
||||
@Test
|
||||
public void testInvalidProcessGroupId() throws Exception {
|
||||
reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID + "-invalid");
|
||||
reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus);
|
||||
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.connect(configurationContextStub);
|
||||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
verify(reporterMock, never()).report();
|
||||
assertNull(testedReportingTask.currentStatusReference.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that {@link MetricsReportingTask#connect(ConfigurationContext)} does not create a new reporter
|
||||
* if there is already an active reporter.
|
||||
*/
|
||||
@Test
|
||||
public void testConnectCreatesSingleReporter() throws Exception {
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.connect(configurationContextStub);
|
||||
testedReportingTask.connect(configurationContextStub);
|
||||
|
||||
verify(reporterServiceStub, times(1)).createReporter(any());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanity check for registered properties.
|
||||
*/
|
||||
@Test
|
||||
public void testGetSupportedPropertyDescriptorsSanity() throws Exception {
|
||||
List<PropertyDescriptor> expected = Arrays.asList(
|
||||
MetricsReportingTask.REPORTER_SERVICE,
|
||||
MetricsReportingTask.PROCESS_GROUP_ID);
|
||||
assertEquals(expected, testedReportingTask.getSupportedPropertyDescriptors());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-nar-bundles</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-metrics-reporting-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>nifi-metrics-reporting-task</module>
|
||||
<module>nifi-metrics-reporting-nar</module>
|
||||
<module>nifi-metrics-reporter-service-api</module>
|
||||
<module>nifi-metrics-reporter-service-api-nar</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporting-task</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -88,6 +88,7 @@
|
|||
<module>nifi-extension-utils</module>
|
||||
<module>nifi-grpc-bundle</module>
|
||||
<module>nifi-redis-bundle</module>
|
||||
<module>nifi-metrics-reporting-bundle</module>
|
||||
</modules>
|
||||
|
||||
<build>
|
||||
|
|
12
pom.xml
12
pom.xml
|
@ -1488,6 +1488,18 @@
|
|||
<artifactId>nifi-redis-nar</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-metrics-reporting-nar</artifactId>
|
||||
<version>1.5.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
Loading…
Reference in New Issue