NIFI-6583: adding azure log analytics reporting task to nifi-azure-bundle

This closes #3817.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
sjyang18 2019-10-15 18:12:07 +00:00 committed by Mark Payne
parent 388683a5c5
commit 0acdde827e
13 changed files with 1385 additions and 2 deletions

View File

@ -35,13 +35,17 @@
<artifactId>nifi-azure-processors</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-reporting-task</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api-nar</artifactId>
<version>1.11.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId>
<version>1.10.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-azure-reporting-task</artifactId>
<packaging>jar</packaging>
<properties>
<nifi.version>1.10.0-SNAPSHOT</nifi.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-metrics</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,328 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.MessageFormat;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
/**
* ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
*/
@Tags({"reporting", "loganalytics", "metrics"})
@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace." +
"Apache NiFi-metrics can be either configured global or on process-group level.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
private static final String JVM_JOB_NAME = "jvm_global";
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String HMAC_SHA256_ALG = "HmacSHA256";
private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
private final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Id")
.description("Log Analytics Workspace Id")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
.name("Log Analytics Custom Log Name")
.description("Log Analytics Custom Log Name")
.required(false)
.defaultValue("nifimetrics")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Key")
.description("Azure Log Analytic Worskspace Key")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
.name("Application ID")
.description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder()
.name("Process group ID(s)")
.description("If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+ " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators
.createListValidator(true, true, StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
.build();
static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
.name("Job Name")
.description("The name of the exporting job")
.defaultValue("nifi_reporting_job")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("Send JVM Metrics")
.description("Send JVM Metrics in addition to the NiFi-metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
.name("Log Analytics URL Endpoint Format")
.description("Log Analytics URL Endpoint Format")
.required(false)
.defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
try {
// Documentation: https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, rfc1123Date);
Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
return String.format("SharedKey %s:%s", workspaceId, hmac);
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LOG_ANALYTICS_WORKSPACE_ID);
properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
properties.add(APPLICATION_ID);
properties.add(INSTANCE_ID);
properties.add(PROCESS_GROUP_IDS);
properties.add(JOB_NAME);
properties.add(SEND_JVM_METRICS);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
return properties;
}
@Override
public void onTrigger(final ReportingContext context){
final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
final boolean jvmMetricsCollected = context.getProperty(SEND_JVM_METRICS).asBoolean();
final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
final String instanceId = context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
final String groupIds = context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
try {
List<Metric> allMetrics = null;
if(groupIds == null || groupIds.isEmpty()) {
ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
String processGroupName = status.getName();
allMetrics = collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected);
} else {
allMetrics = new ArrayList<>();
for(String groupId: groupIds.split(",")) {
groupId =groupId.trim();
ProcessGroupStatus status = context.getEventAccess().getGroupStatus(groupId);
String processGroupName = status.getName();
allMetrics.addAll(collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected));
}
}
HttpPost httpPost = getHttpPost(urlEndpointFormat, workspaceId, logName);
sendMetrics(httpPost, workspaceId, linuxPrimaryKey, allMetrics);
} catch (Exception e) {
getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
}
}
/**
* Construct HttpPost and return it
* @param urlFormat URL format to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
* @param logName log table name where metrics will be pushed
* @return HttpsURLConnection to your azure log analytics workspace
* @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
*/
protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
throws IllegalArgumentException {
String dataCollectorEndpoint =
MessageFormat.format(urlFormat, workspaceId);
HttpPost post = new HttpPost(dataCollectorEndpoint);
post.addHeader("Content-Type", "application/json");
post.addHeader("Log-Type", logName);
return post;
}
/**
* send collected metrics to azure log analytics workspace
* @param request HttpPost to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
* @param linuxPrimaryKey your azure log analytics workspace key
* @param allMetrics collected metrics to be sent
* @throws IOException when there is an error in https url connection or read/write to the onnection
* @throws IllegalArgumentException when there a exception in converting metrics to json string with Gson.toJson
* @throws RuntimeException when httpPost fails with none 200 status code
*/
protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
throws IOException, IllegalArgumentException, RuntimeException {
Gson gson = new GsonBuilder().create();
StringBuilder builder = new StringBuilder();
builder.append('[');
for (Metric current : allMetrics) {
builder.append(gson.toJson(current));
builder.append(',');
}
builder.append(']');
final String rawJson = builder.toString();
final int bodyLength = rawJson.getBytes(UTF8).length;
final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
request.addHeader("Authorization", createAuthorization);
request.addHeader("x-ms-date", nowRfc1123);
request.setEntity(new StringEntity(rawJson));
try(CloseableHttpClient httpClient = HttpClients.createDefault()){
postRequest(httpClient, request);
}
}
/**
* post request with httpClient and httpPost
* @param httpClient HttpClient
* @param request HttpPost
* @throws IOException if httpClient.execute fails
* @throws RuntimeException if post request status return other than 200
*/
protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
throws IOException, RuntimeException {
try (CloseableHttpResponse response = httpClient.execute(request)) {
if(response != null && response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(response.getStatusLine().toString());
}
}
}
/**
* collect metrics to be sent to azure log analytics workspace
* @param instanceId instance id
* @param status process group status
* @param processGroupName process group name
* @param jvmMetricsCollected whether we want to collect jvm metrics or not
* @return list of metrics collected
*/
private List<Metric> collectMetrics(final String instanceId,
final ProcessGroupStatus status, final String processGroupName, final boolean jvmMetricsCollected) {
List<Metric> allMetrics = new ArrayList<>();
// dataflow process group level metrics
allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getDataFlowMetrics(status, instanceId));
// connections process group level metrics
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(status, connectionStatuses);
for (ConnectionStatus connectionStatus: connectionStatuses) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId, processGroupName));
}
// processor level metrics
final List<ProcessorStatus> processorStatuses = new ArrayList<>();
populateProcessorStatuses(status, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName)
);
}
if (jvmMetricsCollected) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
}
return allMetrics;
}
private void populateProcessorStatuses(final ProcessGroupStatus groupStatus, final List<ProcessorStatus> statuses) {
statuses.addAll(groupStatus.getProcessorStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateProcessorStatuses(childGroupStatus, statuses);
}
}
private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
statuses.addAll(groupStatus.getConnectionStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateConnectionStatuses(childGroupStatus, statuses);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import com.google.gson.annotations.SerializedName;
public class Metric {
public static final String CATEGORY_DATAFLOW = "DataFlow";
public static final String CATEGORY_CONNECTIONS = "Connections";
public static final String CATEGORY_PROCESSOR = "Processor";
public static final String CATEGORY_JVM = "JvmMetrics";
@SerializedName("Computer") String computer;
@SerializedName("ProcessGroupId") private String processGroupId;
@SerializedName("ProcessGroupName") private String processGroupName;
@SerializedName("ProcessorId") private String processorId;
@SerializedName("ProcessorName") private String processorName;
@SerializedName("Count") private Long count;
@SerializedName("Name") private String name;
@SerializedName("CategoryName") private String categoryName;
@SerializedName("Tags") private String tags;
public Metric(String instanceId, String processGroupId, String processGroupName ) {
this.computer = instanceId;
this.processGroupName = processGroupName;
this.processGroupId = processGroupId;
}
public void setCount(long value){
this.count = Long.valueOf((long)value);
}
public void setCount(double value){
this.count = Long.valueOf((long)value);
}
public void setCount(int value){
this.count = Long.valueOf((long)value);
}
public Long getCount() {
return this.count;
}
public String getComputer() {
return computer;
}
public void setCoumputer(String computer) {
this.computer = computer;
}
public String getProcessGroupId() {
return processGroupId;
}
public void setProcessGroupId(String processGroupId) {
this.processGroupId = processGroupId;
}
public String getProcessGroupName() {
return processGroupName;
}
public void setProcessGroupName(String processGroupName) {
this.processGroupName = processGroupName;
}
public String getProcessorId() {
return processorId;
}
public void setProcessorId(String processorId) {
this.processorId = processorId;
}
public String getProcessorName() {
return processorName;
}
public void setProcessorName(String processorName) {
this.processorName = processorName;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategoryName() {
return categoryName;
}
public void setCategoryName(String categoryName) {
this.categoryName = categoryName;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
/**
* The Metric names to send to Azure Log Analytics.
*/
public interface MetricNames {
// Metric Name separator
String METRIC_NAME_SEPARATOR = ".";
// NiFi Metrics
String FLOW_FILES_RECEIVED = "FlowFilesReceived";
String FLOW_FILES_TRANSFERRED = "FlowFilesTransferred";
String BYTES_RECEIVED = "BytesReceived";
String FLOW_FILES_SENT = "FlowFilesSent";
String BYTES_SENT = "BytesSent";
String FLOW_FILES_QUEUED = "FlowFilesQueued";
String BYTES_TRANSFERRED= "BytesTransferred";
String BYTES_QUEUED= "BytesQueued";
String BYTES_READ = "BytesRead";
String BYTES_WRITTEN = "BytesWritten";
String ACTIVE_THREADS = "ActiveThreads";
String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
String OUTPUT_CONTENT_SIZE = "OutputContentSize";
String INPUT_CONTENT_SIZE = "InputContentSize";
String QUEUED_CONTENT_SIZE = "QueuedContentSize";
String OUTPUT_COUNT = "OutputCount";
String INPUT_COUNT = "InputCount";
String QUEUED_COUNT = "QueuedCount";
String OUTPUT_BYTES = "OutputBytes";
String INPUT_BYTES = "InputBytes";
String QUEUED_BYTES = "QueuedBytes";
// JVM Metrics
String JVM_UPTIME = "jvm.uptime";
String JVM_HEAP_USED = "jvm.heap_used";
String JVM_HEAP_USAGE = "jvm.heap_usage";
String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
String JVM_THREAD_COUNT = "jvm.thread_count";
String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
String JVM_GC_RUNS = "jvm.gc.runs";
String JVM_GC_TIME = "jvm.gc.time";
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import java.util.ArrayList;
import java.util.List;
/**
* MetricsBuilder builds the list of metrics
*/
public class MetricsBuilder{
private List<Metric> metrics = new ArrayList<>();
private String computer;
private String categoryName;
private String processGroupId;
private String processGroupName;
private String processorId;
private String processorName;
private boolean isProcessorMetric = false;
private String tags = null;
public MetricsBuilder(String category, String instanceId, String processGroupId, String processGroupName) {
this.computer = instanceId;
this.processGroupName = processGroupName;
this.processGroupId = processGroupId;
this.categoryName = category;
if (category.equals(Metric.CATEGORY_PROCESSOR)){
isProcessorMetric = true;
}
}
public MetricsBuilder(String category, String instanceId, String processGroupId, String processGroupName, String processorId, String processorName) {
this(category, instanceId,processGroupId,processGroupName);
this.processorId = processorId;
this.processorName =processorName;
}
public MetricsBuilder setProcessorId(String processorId){
this.processorId = processorId;
return this;
}
public MetricsBuilder setProcessorName(String processorName){
this.processorName = processorName;
return this;
}
public MetricsBuilder setTags(String tags) {
this.tags = tags;
return this;
}
public MetricsBuilder metric(String metricName, long count){
Metric metric = null;
if(isProcessorMetric) {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
metric.setProcessorId(this.processorId);
metric.setProcessorName(this.processorName);
} else {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
}
metric.setCategoryName(this.categoryName);
metric.setName(metricName);
metric.setCount(count);
if(this.tags != null) {
metric.setTags(this.tags);
}
metrics.add(metric);
return this;
}
public MetricsBuilder metric(String metricName, double count){
Metric metric = null;
if(isProcessorMetric) {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
metric.setProcessorId(this.processorId);
metric.setProcessorName(this.processorName);
} else {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
}
metric.setCategoryName(this.categoryName);
metric.setName(metricName);
metric.setCount(count);
if(this.tags != null) {
metric.setTags(this.tags);
}
metrics.add(metric);
return this;
}
public MetricsBuilder metric(String metricName, int count) {
Metric metric = null;
if(isProcessorMetric) {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
metric.setProcessorId(this.processorId);
metric.setProcessorName(this.processorName);
} else {
metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
}
metric.setCategoryName(this.categoryName);
metric.setName(metricName);
metric.setCount(count);
if(this.tags != null) {
metric.setTags(this.tags);
}
metrics.add(metric);
return this;
}
public List<Metric> build() {
return metrics;
}
public List<Metric> getMetrics() {
return this.metrics;
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public String getComputer() {
return this.computer;
}
public void setComputer(String Computer) {
this.computer = Computer;
}
public String getCategoryName() {
return this.categoryName;
}
public void setCategoryName(String CategoryName) {
this.categoryName = CategoryName;
}
public String getProcessGroupId() {
return this.processGroupId;
}
public void setProcessGroupId(String ProcessGroupId) {
this.processGroupId = ProcessGroupId;
}
public String getProcessGroupName() {
return this.processGroupName;
}
public void setProcessGroupName(String ProcessGroupName) {
this.processGroupName = ProcessGroupName;
}
public String getProcessorId() {
return this.processorId;
}
public String getProcessorName() {
return this.processorName;
}
public boolean isIsProcessorMetric() {
return this.isProcessorMetric;
}
public boolean getIsProcessorMetric() {
return this.isProcessorMetric;
}
public void setIsProcessorMetric(boolean isProcessorMetric) {
this.isProcessorMetric = isProcessorMetric;
}
public String getTags() {
return this.tags;
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.reporting.azure.loganalytics.MetricNames;
import org.apache.nifi.reporting.azure.loganalytics.Metric;
import org.apache.nifi.reporting.azure.loganalytics.MetricsBuilder;
public class AzureLogAnalyticsMetricsFactory {
public static List<Metric> getDataFlowMetrics(ProcessGroupStatus status, String instanceId){
final String groupId = status.getId();
final String groupName = status.getName();
MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_DATAFLOW,instanceId, groupId, groupName);
// build dataflow metrics
builder.metric(MetricNames.FLOW_FILES_RECEIVED, status.getFlowFilesReceived())
.metric(MetricNames.FLOW_FILES_SENT, status.getFlowFilesSent())
.metric(MetricNames.FLOW_FILES_QUEUED, status.getQueuedCount())
.metric(MetricNames.BYTES_RECEIVED,status.getBytesReceived())
.metric(MetricNames.BYTES_WRITTEN,status.getBytesWritten())
.metric(MetricNames.BYTES_READ, status.getBytesRead())
.metric(MetricNames.BYTES_SENT, status.getBytesSent())
.metric(MetricNames.BYTES_QUEUED,status.getQueuedContentSize())
.metric(MetricNames.ACTIVE_THREADS,status.getActiveThreadCount())
.metric(MetricNames.TOTAL_TASK_DURATION_SECONDS,calculateProcessingNanos(status));
return builder.build();
}
public static List<Metric> getConnectionStatusMetrics(ConnectionStatus status, String instanceId, String groupName){
final String groupId = status.getGroupId();
final String tags = String.format(
"[source=%s][destination=%s][cname=%s]", status.getSourceName(), status.getDestinationName(),
status.getName());
MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_CONNECTIONS,instanceId, groupId, groupName);
builder.setTags(tags)
.metric(MetricNames.INPUT_COUNT,status.getInputCount())
.metric(MetricNames.INPUT_BYTES, status.getInputBytes())
.metric(MetricNames.QUEUED_COUNT, status.getQueuedCount())
.metric(MetricNames.QUEUED_BYTES, status.getQueuedBytes())
.metric(MetricNames.OUTPUT_COUNT, status.getOutputCount())
.metric(MetricNames.OUTPUT_BYTES, status.getOutputBytes());
return builder.build();
}
public static List<Metric> getProcessorMetrics(ProcessorStatus status, String instanceId, String groupName){
MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_PROCESSOR,instanceId, status.getGroupId(), groupName);
builder.setProcessorId(status.getId())
.setProcessorName(status.getName())
.metric(MetricNames.FLOW_FILES_RECEIVED, status.getInputCount())
.metric(MetricNames.FLOW_FILES_SENT, status.getOutputCount())
.metric(MetricNames.BYTES_READ, status.getInputBytes())
.metric(MetricNames.BYTES_WRITTEN,status.getOutputBytes())
.metric(MetricNames.ACTIVE_THREADS, status.getActiveThreadCount())
.metric(MetricNames.TOTAL_TASK_DURATION_SECONDS, status.getProcessingNanos());
return builder.build();
}
//virtual machine metrics
public static List<Metric> getJvmMetrics(JvmMetrics virtualMachineMetrics, String instanceId, String groupName) {
MetricsBuilder builder = new MetricsBuilder(Metric.CATEGORY_JVM, instanceId, "", groupName);
builder.metric(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed(DataUnit.B))
.metric(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage())
.metric(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage())
.metric(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage())
.metric(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime())
.metric(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount())
.metric(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
// Append GC stats
virtualMachineMetrics.garbageCollectors()
.forEach((name, stat) -> {
name = name.toLowerCase().replaceAll("\\s", "_");
builder.metric(MetricNames.JVM_GC_RUNS + "." + name, stat.getRuns())
.metric(MetricNames.JVM_GC_TIME + "." + name, stat.getTime(TimeUnit.MILLISECONDS));
});
// Append thread states
virtualMachineMetrics.threadStatePercentages()
.forEach((state, usage) -> {
String name = state.name().toLowerCase().replaceAll("\\s", "_");
builder.metric("jvm.thread_states." + name, usage);
});
// Append pool stats
virtualMachineMetrics.memoryPoolUsage()
.forEach((name, usage) -> {
name = name.toLowerCase().replaceAll("\\s", "_");
builder.metric("jvm.mem_pool_" + name, usage);
});
return builder.build();
}
// calculates the total processing time of all processors in nanos
static long calculateProcessingNanos(final ProcessGroupStatus status) {
long nanos = 0L;
for (final ProcessorStatus procStats : status.getProcessorStatus()) {
nanos += procStats.getProcessingNanos();
}
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
nanos += calculateProcessingNanos(childGroupStatus);
}
return nanos;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask

View File

@ -0,0 +1,54 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<title>AzureLogAnalyticsReportingTask</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h2>AzureLogAnalyticsReportingTask</h2>
<p>This ReportingTask sends the following metrics to Azure Log Analytics workspace:</p>
<ul>
<li>FlowFilesReceivedLast5Minutes</li>
<li>BytesReceivedLast5Minutes</li>
<li>FlowFilesSentLast5Minutes</li>
<li>BytesSentLast5Minutes</li>
<li>FlowFilesQueued</li>
<li>BytesQueued</li>
<li>BytesReadLast5Minutes</li>
<li>BytesWrittenLast5Minutes</li>
<li>ActiveThreads</li>
<li>TotalTaskDurationSeconds</li>
<li>jvm.uptime</li>
<li>jvm.heap_used</li>
<li>jvm.heap_usage</li>
<li>jvm.non_heap_usage</li>
<li>jvm.thread_states.runnable</li>
<li>jvm.thread_states.blocked</li>
<li>jvm.thread_states.timed_waiting</li>
<li>jvm.thread_states.terminated</li>
<li>jvm.thread_count</li>
<li>jvm.daemon_thread_count</li>
<li>jvm.file_descriptor_usage</li>
<li>jvm.gc.runs</li>
<li>jvm.gc.time</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockReportingContext;
import org.apache.nifi.util.MockReportingInitializationContext;
import org.apache.nifi.util.MockVariableRegistry;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class TestAzureLogAnalyticsReportingTask {
private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
private static final String TEST_TASK_ID = "test-azureloganalyticsreportingtask-id";
private static final String MOCK_KEY = "abcdefg";
private static final String TEST_GROUP1_ID= "testgpid1";
private static final String TEST_GROUP2_ID= "testgpid2";
private MockReportingInitializationContext reportingInitContextStub;
private MockReportingContext reportingContextStub;
private TestableAzureLogAnalyticsReportingTask testedReportingTask;
private ProcessGroupStatus rootGroupStatus;
private ProcessGroupStatus testGroupStatus;
private ProcessGroupStatus testGroupStatus2;
private ProcessorStatus procStatus;
@Before
public void setup() {
testedReportingTask = new TestableAzureLogAnalyticsReportingTask();
rootGroupStatus = new ProcessGroupStatus();
reportingInitContextStub = new MockReportingInitializationContext(TEST_INIT_CONTEXT_ID, TEST_INIT_CONTEXT_NAME,
new MockComponentLog(TEST_TASK_ID, testedReportingTask));
reportingContextStub = new MockReportingContext(Collections.emptyMap(),
new MockStateManager(testedReportingTask), new MockVariableRegistry());
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.INSTANCE_ID.getName(), TEST_TASK_ID);
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.LOG_ANALYTICS_WORKSPACE_ID.getName(), TEST_TASK_ID);
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.LOG_ANALYTICS_WORKSPACE_KEY.getName(), MOCK_KEY);
rootGroupStatus.setId("1234");
rootGroupStatus.setFlowFilesReceived(5);
rootGroupStatus.setBytesReceived(10000);
rootGroupStatus.setFlowFilesSent(10);
rootGroupStatus.setBytesSent(20000);
rootGroupStatus.setQueuedCount(100);
rootGroupStatus.setQueuedContentSize(1024L);
rootGroupStatus.setBytesRead(60000L);
rootGroupStatus.setBytesWritten(80000L);
rootGroupStatus.setActiveThreadCount(5);
rootGroupStatus.setName("root");
rootGroupStatus.setFlowFilesTransferred(5);
rootGroupStatus.setBytesTransferred(10000);
rootGroupStatus.setOutputContentSize(1000L);
rootGroupStatus.setInputContentSize(1000L);
rootGroupStatus.setOutputCount(100);
rootGroupStatus.setInputCount(1000);
initProcessorStatuses();
}
private void initProcessorStatuses() {
procStatus = new ProcessorStatus();
procStatus.setProcessingNanos(123456789);
procStatus.setInputCount(2);
procStatus.setOutputCount(4);
procStatus.setActiveThreadCount(6);
procStatus.setBytesSent(1256);
procStatus.setName("sampleProcessor");
Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
rootGroupStatus.setProcessorStatus(processorStatuses);
ProcessGroupStatus groupStatus = new ProcessGroupStatus();
groupStatus.setProcessorStatus(processorStatuses);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus);
rootGroupStatus.setProcessGroupStatus(groupStatuses);
}
private void initTestGroupStatuses() {
testGroupStatus = new ProcessGroupStatus();
testGroupStatus.setId(TEST_GROUP1_ID);
testGroupStatus.setFlowFilesReceived(5);
testGroupStatus.setBytesReceived(10000);
testGroupStatus.setFlowFilesSent(10);
testGroupStatus.setBytesSent(20000);
testGroupStatus.setQueuedCount(100);
testGroupStatus.setQueuedContentSize(1024L);
testGroupStatus.setBytesRead(60000L);
testGroupStatus.setBytesWritten(80000L);
testGroupStatus.setActiveThreadCount(5);
testGroupStatus.setName(TEST_GROUP1_ID);
testGroupStatus.setFlowFilesTransferred(5);
testGroupStatus.setBytesTransferred(10000);
testGroupStatus.setOutputContentSize(1000L);
testGroupStatus.setInputContentSize(1000L);
testGroupStatus.setOutputCount(100);
testGroupStatus.setInputCount(1000);
}
private void initTestGroup2Statuses() {
testGroupStatus2 = new ProcessGroupStatus();
testGroupStatus2.setId(TEST_GROUP2_ID);
testGroupStatus2.setFlowFilesReceived(5);
testGroupStatus2.setBytesReceived(10000);
testGroupStatus2.setFlowFilesSent(10);
testGroupStatus2.setBytesSent(20000);
testGroupStatus2.setQueuedCount(100);
testGroupStatus2.setQueuedContentSize(1024L);
testGroupStatus2.setBytesRead(60000L);
testGroupStatus2.setBytesWritten(80000L);
testGroupStatus2.setActiveThreadCount(5);
testGroupStatus2.setName(TEST_GROUP2_ID);
testGroupStatus2.setFlowFilesTransferred(5);
testGroupStatus2.setBytesTransferred(10000);
testGroupStatus2.setOutputContentSize(1000L);
testGroupStatus2.setInputContentSize(1000L);
testGroupStatus2.setOutputCount(100);
testGroupStatus2.setInputCount(1000);
}
@Test
public void testOnTrigger() throws IOException, InterruptedException, InitializationException {
testedReportingTask.initialize(reportingInitContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
testedReportingTask.onTrigger(reportingContextStub);
List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
TestVerification.assertDatatFlowMetrics(collectedMetrics);
}
@Test
public void testOnTriggerWithOnePG() throws IOException, InterruptedException, InitializationException {
initTestGroupStatuses();
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.PROCESS_GROUP_IDS.getName(), TEST_GROUP1_ID);
testedReportingTask.initialize(reportingInitContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP1_ID, testGroupStatus);
testedReportingTask.onTrigger(reportingContextStub);
List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
TestVerification.assertDatatFlowMetrics(collectedMetrics);
}
@Test
public void testOnTriggerWithPGList() throws IOException, InterruptedException, InitializationException {
initTestGroupStatuses();
initTestGroup2Statuses();
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.PROCESS_GROUP_IDS.getName(),
String.format("%s, %s", TEST_GROUP1_ID, TEST_GROUP2_ID));
testedReportingTask.initialize(reportingInitContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP1_ID, testGroupStatus);
reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP2_ID, testGroupStatus2);
testedReportingTask.onTrigger(reportingContextStub);
List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
TestVerification.assertDatatFlowMetrics(collectedMetrics);
}
@Test
public void testEmitJVMMetrics() throws IOException, InterruptedException, InitializationException {
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.SEND_JVM_METRICS.getName(), "true");
testedReportingTask.initialize(reportingInitContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
testedReportingTask.onTrigger(reportingContextStub);
List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
TestVerification.assertJVMMetrics(collectedMetrics);
}
@Test
public void testAuthorization() throws IOException, InterruptedException, InitializationException {
reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.SEND_JVM_METRICS.getName(), "true");
testedReportingTask.initialize(reportingInitContextStub);
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
testedReportingTask.onTrigger(reportingContextStub);
HttpPost postRequest = testedReportingTask.getPostRequest();
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(postRequest, atLeast(1)).addHeader( eq("Authorization"), captor.capture());
assertTrue(captor.getValue().contains("SharedKey"));
}
private final class TestableAzureLogAnalyticsReportingTask extends AzureLogAnalyticsReportingTask {
private List<Metric> metricsCollected;
@Override
protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
final List<Metric> allMetrics) throws IOException{
metricsCollected = allMetrics;
super.sendMetrics(request, workspaceId, linuxPrimaryKey, allMetrics);
}
public List<Metric> getMetricsCollected() {
return metricsCollected;
}
private HttpPost mockHttpPost;
@Override
protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName) throws IllegalArgumentException {
mockHttpPost = Mockito.mock(HttpPost.class);
return mockHttpPost;
}
public HttpPost getPostRequest(){
return mockHttpPost;
}
@Override
protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
throws IOException, RuntimeException {
// replace with mock httpclient and call base postRequest
CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class);
super.postRequest(mockClient, request);
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import java.util.ArrayList;
import java.util.List;
import com.google.gson.Gson;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
import org.junit.Before;
import org.junit.Test;
public class TestMetricsFactory {
private ProcessGroupStatus status;
private final Gson gson = new Gson();
@Before
public void init() {
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
status.setFlowFilesSent(10);
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
}
@Test
public void testGetDataFlowMetrics() {
ProcessorStatus procStatus = new ProcessorStatus();
List<ProcessorStatus> processorStatuses = new ArrayList<>();
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
List<Metric> metrics = AzureLogAnalyticsMetricsFactory.getDataFlowMetrics(status, "testcase");
TestVerification.assertDatatFlowMetrics(metrics);
}
@Test
public void testGetVirtualMachineMetrics() {
JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
List<Metric> metrics = AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, "testcase", "tests");
String metricsInString = gson.toJson(metrics);
System.out.println(metricsInString);
TestVerification.assertJVMMetrics(metrics);
}
@Test
public void testToJsonWithLongValue() {
Metric metric = new Metric("instanceId", "groupId", "groupName");
metric.setCount(0x7ff8000000000000L);
gson.toJson(metric);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import static org.junit.Assert.assertTrue;
import java.util.List;
public class TestVerification {
static public void assertDatatFlowMetrics(List<Metric> collectedMetrics) {
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.FLOW_FILES_RECEIVED) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.BYTES_RECEIVED) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.FLOW_FILES_SENT) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.BYTES_SENT) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.FLOW_FILES_QUEUED) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.BYTES_QUEUED) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.BYTES_READ) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.BYTES_WRITTEN) && o.getCategoryName().equals("DataFlow")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.ACTIVE_THREADS) && o.getCategoryName().equals("DataFlow")));
}
static public void assertJVMMetrics(List<Metric> collectedMetrics) {
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_HEAP_USED) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_NON_HEAP_USAGE) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_THREAD_COUNT) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_FILE_DESCRIPTOR_USAGE) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_DAEMON_THREAD_COUNT) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_THREAD_STATES_BLOCKED) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_UPTIME) && o.getCategoryName().equals("JvmMetrics")));
assertTrue( collectedMetrics.stream().anyMatch(
o -> o.getName().equals(MetricNames.JVM_HEAP_USAGE) && o.getCategoryName().equals("JvmMetrics")));
}
}

View File

@ -31,6 +31,7 @@
<modules>
<module>nifi-azure-processors</module>
<module>nifi-azure-reporting-task</module>
<module>nifi-azure-nar</module>
<module>nifi-azure-services-api</module>
<module>nifi-azure-services-api-nar</module>