NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Ubuntu 2019-12-13 20:12:28 +00:00 committed by Joe Witt
parent e4bdc79ea6
commit 65ca8e6c8d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
6 changed files with 788 additions and 185 deletions

View File

@ -59,5 +59,11 @@
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-reporting-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.MessageFormat;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
/**
* Abstract ReportingTask to send metrics from Apache NiFi and JVM to Azure
* Monitor.
*/
public abstract class AbstractAzureLogAnalyticsReportingTask extends AbstractReportingTask {
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String HMAC_SHA256_ALG = "HmacSHA256";
private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter
.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Id").description("Log Analytics Workspace Id").required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Key").description("Azure Log Analytic Worskspace Key").required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder().name("Application ID")
.description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
.required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder().name("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
.required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder().name("Process group ID(s)")
.description(
"If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+ " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.createListValidator(true, true,
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
.build();
static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder().name("Job Name")
.description("The name of the exporting job").defaultValue("nifi_reporting_job")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
.name("Log Analytics URL Endpoint Format").description("Log Analytics URL Endpoint Format").required(false)
.defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
protected String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
try {
String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength,
rfc1123Date);
Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
return String.format("SharedKey %s:%s", workspaceId, hmac);
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LOG_ANALYTICS_WORKSPACE_ID);
properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
properties.add(APPLICATION_ID);
properties.add(INSTANCE_ID);
properties.add(PROCESS_GROUP_IDS);
properties.add(JOB_NAME);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
return properties;
}
/**
* Construct HttpPost and return it
*
* @param urlFormat URL format to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
* @param logName log table name where metrics will be pushed
* @return HttpsURLConnection to your azure log analytics workspace
* @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
*/
protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
throws IllegalArgumentException {
String dataCollectorEndpoint = MessageFormat.format(urlFormat, workspaceId);
HttpPost post = new HttpPost(dataCollectorEndpoint);
post.addHeader("Content-Type", "application/json");
post.addHeader("Log-Type", logName);
return post;
}
protected void sendToLogAnalytics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
final String rawJson) throws IllegalArgumentException, RuntimeException, IOException {
final int bodyLength = rawJson.getBytes(UTF8).length;
final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
request.addHeader("Authorization", createAuthorization);
request.addHeader("x-ms-date", nowRfc1123);
request.setEntity(new StringEntity(rawJson));
request.setEntity(new StringEntity(rawJson));
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
postRequest(httpClient, request);
}
}
/**
* post request with httpClient and httpPost
*
* @param httpClient HttpClient
* @param request HttpPost
* @throws IOException if httpClient.execute fails
* @throws RuntimeException if post request status return other than 200
*/
protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
throws IOException, RuntimeException {
try (CloseableHttpResponse response = httpClient.execute(request)) {
if (response != null && response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(response.getStatusLine().toString());
}
}
}
}

View File

@ -0,0 +1,470 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpPost;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
@Tags({ "azure", "provenace", "reporting", "log analytics" })
@CapabilityDescription("Publishes Provenance events to to a Azure Log Analytics workspace.")
public class AzureLogAnalyticsProvenanceReportingTask extends AbstractAzureLogAnalyticsReportingTask {
protected static final String LAST_EVENT_ID_KEY = "last_event_id";
protected static final String DESTINATION_URL_PATH = "/nifi";
protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
.name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
.defaultValue("nifiprovenance").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream",
"Beginning of Stream",
"Start reading provenance Events from the beginning of the stream (the oldest event first)");
static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
"Start reading provenance Events from the end of the stream, ignoring old events");
static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter").displayName("Event Type to Include")
.description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ "Available event types are "
+ Arrays.deepToString(ProvenanceEventType.values())
+ ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude")
.description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+ "Available event types are "
+ Arrays.deepToString(ProvenanceEventType.values())
+ ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+ "exclusion takes precedence and the event will not be sent.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter").displayName("Component Type to Include")
.description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude")
.description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+ "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter").displayName("Component ID to Include")
.description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude")
.description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+ "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder()
.name("s2s-prov-task-name-filter").displayName("Component Name to Include")
.description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude")
.description("Regular expression to exclude the provenance events based on the component name. The events matching the regular "
+ "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position")
.displayName("Start Position")
.description("If the Reporting Task has never been run, or if its state has been reset by a user, "
+ "specifies where in the stream of Provenance Events the Reporting Task should start")
.allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
.defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder().name("include-null-values")
.displayName("Include Null Values")
.description("Indicate if null values should be included in records. Default will be false")
.required(true).allowableValues("true", "false").defaultValue("false").build();
static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform")
.description("The value to use for the platform field in each event.").required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder().name("Instance URL")
.displayName("Instance URL")
.description("The URL of this instance to use in the Content URI of each event.").required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("http://${hostname(true)}:8080/nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size")
.displayName("Batch Size")
.description("Specifies how many records to send in a single batch, at most.").required(true)
.defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
private ConfigurationContext context;
private volatile ProvenanceEventConsumer consumer;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LOG_ANALYTICS_WORKSPACE_ID);
properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
properties.add(APPLICATION_ID);
properties.add(INSTANCE_ID);
properties.add(JOB_NAME);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
properties.add(FILTER_EVENT_TYPE);
properties.add(FILTER_EVENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_TYPE);
properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_ID);
properties.add(FILTER_COMPONENT_ID_EXCLUDE);
properties.add(FILTER_COMPONENT_NAME);
properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
properties.add(START_POSITION);
properties.add(ALLOW_NULL_VALUES);
properties.add(PLATFORM);
properties.add(INSTANCE_URL);
properties.add(BATCH_SIZE);
return properties;
}
public void CreateConsumer(final ReportingContext context) {
if (consumer != null)
return;
consumer = new ProvenanceEventConsumer();
consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
consumer.setLogger(getLogger());
// initialize component type filtering
consumer.setComponentTypeRegex(
context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE)
.evaluateAttributeExpressions().getValue());
consumer.setComponentNameRegex(
context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE)
.evaluateAttributeExpressions().getValue());
final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(
context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ','));
if (targetEventTypes != null) {
for (final String type : targetEventTypes) {
try {
consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
} catch (final Exception e) {
getLogger().warn(type
+ " is not a correct event type, removed from the filtering.");
}
}
}
final String[] targetEventTypesExclude = StringUtils
.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE)
.evaluateAttributeExpressions().getValue(), ','));
if (targetEventTypesExclude != null) {
for (final String type : targetEventTypesExclude) {
try {
consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
} catch (final Exception e) {
getLogger().warn(type
+ " is not a correct event type, removed from the exclude filtering.");
}
}
}
// initialize component ID filtering
final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(
context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(),
','));
if (targetComponentIds != null) {
consumer.addTargetComponentId(targetComponentIds);
}
final String[] targetComponentIdsExclude = StringUtils
.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE)
.evaluateAttributeExpressions().getValue(), ','));
if (targetComponentIdsExclude != null) {
consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
}
consumer.setScheduled(true);
}
@Override
public void onTrigger(ReportingContext context) {
final boolean isClustered = context.isClustered();
final String nodeId = context.getClusterNodeIdentifier();
if (nodeId == null && isClustered) {
getLogger().debug(
"This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ "Will wait for Node Identifier to be established.");
return;
}
try {
processProvenanceData(context);
} catch (final Exception e) {
getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
}
}
public void processProvenanceData(final ReportingContext context) throws IOException {
getLogger().debug("Starting to process provenance data");
final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID)
.evaluateAttributeExpressions().getValue();
final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY)
.evaluateAttributeExpressions().getValue();
final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
.getValue();
final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
.evaluateAttributeExpressions().getValue();
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String dataCollectorEndpoint = MessageFormat.format(urlEndpointFormat, workspaceId);
final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
final String nodeId = context.getClusterNodeIdentifier();
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
URL url;
try {
url = new URL(nifiUrl);
} catch (final MalformedURLException e1) {
throw new AssertionError();
}
final String hostname = url.getHost();
final Map<String, Object> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
CreateConsumer(context);
consumer.consumeEvents(context, (mapHolder, events) -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append('[');
for (final ProvenanceEventRecord event : events) {
final String componentName = mapHolder.getComponentName(event.getComponentId());
final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(),
event.getComponentType());
final String processGroupName = mapHolder.getComponentName(processGroupId);
final JsonObject jo = serialize(factory, builder, event, df, componentName,
processGroupId, processGroupName, hostname, url, rootGroupName,
platform, nodeId, allowNullValues);
stringBuilder.append(jo.toString());
stringBuilder.append(',');
}
if (stringBuilder.charAt(stringBuilder.length() - 1) == ',')
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(']');
String str = stringBuilder.toString();
if (!str.equals("[]")) {
final HttpPost httpPost = new HttpPost(dataCollectorEndpoint);
httpPost.addHeader("Content-Type", "application/json");
httpPost.addHeader("Log-Type", logName);
getLogger().debug("Sending " + batchSize + " events of length " + str.length() + " to azure log analytics " + logName);
try {
sendToLogAnalytics(httpPost, workspaceId, linuxPrimaryKey, str);
} catch (final Exception e) {
getLogger().error("Failed to publish provenance data to Azure Log Analytics", e);
}
}
});
getLogger().debug("Done processing provenance data");
}
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder,
final ProvenanceEventRecord event, final DateFormat df, final String componentName,
final String processGroupId, final String processGroupName, final String hostname,
final URL nifiUrl, final String applicationName, final String platform,
final String nodeIdentifier, Boolean allowNullValues) {
addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "eventOrdinal", event.getEventId(), allowNullValues);
addField(builder, "eventType", event.getEventType().name(), allowNullValues);
addField(builder, "timestampMillis", event.getEventTime(), allowNullValues);
addField(builder, "timestamp", df.format(event.getEventTime()), allowNullValues);
addField(builder, "durationMillis", event.getEventDuration(), allowNullValues);
addField(builder, "lineageStart", event.getLineageStartDate(), allowNullValues);
addField(builder, "details", event.getDetails(), allowNullValues);
addField(builder, "componentId", event.getComponentId(), allowNullValues);
addField(builder, "componentType", event.getComponentType(), allowNullValues);
addField(builder, "componentName", componentName, allowNullValues);
addField(builder, "processGroupId", processGroupId, allowNullValues);
addField(builder, "processGroupName", processGroupName, allowNullValues);
addField(builder, "entityId", event.getFlowFileUuid(), allowNullValues);
addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", allowNullValues);
addField(builder, "entitySize", event.getFileSize(), allowNullValues);
addField(builder, "previousEntitySize", event.getPreviousFileSize(), allowNullValues);
addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues);
addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues);
addField(builder, "actorHostname", hostname, allowNullValues);
if (nifiUrl != null) {
// TO get URL Prefix, we just remove the /nifi from the end of the URL. We know
// that the URL ends with
// "/nifi" because the Property Validator enforces it
final String urlString = nifiUrl.toString();
final String urlPrefix = urlString.substring(0,
urlString.length() - DESTINATION_URL_PATH.length());
final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId()
+ "/content/";
final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier;
addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix, allowNullValues);
addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix,
allowNullValues);
}
addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues);
addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues);
addField(builder, "transitUri", event.getTransitUri(), allowNullValues);
addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), allowNullValues);
addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "application", applicationName, allowNullValues);
return builder.build();
}
@OnUnscheduled
public void onUnscheduled() {
if (consumer != null) {
getLogger().debug("Disabling schedule to consume provenance data.");
consumer.setScheduled(false);
}
}
public static void addField(final JsonObjectBuilder builder, final String key, final Object value,
boolean allowNullValues) {
if (value != null) {
if (value instanceof String) {
builder.add(key, (String) value);
} else if (value instanceof Integer) {
builder.add(key, (Integer) value);
} else if (value instanceof Boolean) {
builder.add(key, (Boolean) value);
} else if (value instanceof Long) {
builder.add(key, (Long) value);
} else {
builder.add(key, value.toString());
}
} else if (allowNullValues) {
builder.add(key, JsonValue.NULL);
}
}
public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
final Map<String, String> values, Boolean allowNullValues) {
if (values != null) {
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, String> entry : values.entrySet()) {
if (entry.getKey() == null) {
continue;
} else if (entry.getValue() == null) {
if (allowNullValues) {
mapBuilder.add(entry.getKey(), JsonValue.NULL);
}
} else {
mapBuilder.add(entry.getKey(), entry.getValue());
}
}
builder.add(key, mapBuilder);
} else if (allowNullValues) {
builder.add(key, JsonValue.NULL);
}
}
public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
final Collection<String> values, Boolean allowNullValues) {
if (values != null) {
builder.add(key, createJsonArray(factory, values));
} else if (allowNullValues) {
builder.add(key, JsonValue.NULL);
}
}
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
final JsonArrayBuilder builder = factory.createArrayBuilder();
for (final String value : values) {
if (value != null) {
builder.add(value);
}
}
return builder;
}
}

View File

@ -17,29 +17,13 @@
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.MessageFormat;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -51,112 +35,29 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
/**
* ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
*/
@Tags({"reporting", "loganalytics", "metrics"})
@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace." +
"Apache NiFi-metrics can be either configured global or on process-group level.")
@Tags({ "azure", "metrics", "reporting", "log analytics" })
@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace."
+ "Apache NiFi-metrics can be either configured global or on process-group level.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
public class AzureLogAnalyticsReportingTask extends AbstractAzureLogAnalyticsReportingTask {
private static final String JVM_JOB_NAME = "jvm_global";
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String HMAC_SHA256_ALG = "HmacSHA256";
private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
private final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Id")
.description("Log Analytics Workspace Id")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder().name("Send JVM Metrics")
.description("Send JVM Metrics in addition to the NiFi-metrics").allowableValues("true", "false")
.defaultValue("false").required(true).build();
static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
.name("Log Analytics Custom Log Name")
.description("Log Analytics Custom Log Name")
.required(false)
.defaultValue("nifimetrics")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
.name("Log Analytics Workspace Key")
.description("Azure Log Analytic Worskspace Key")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
.name("Application ID")
.description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder()
.name("Process group ID(s)")
.description("If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+ " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators
.createListValidator(true, true, StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
.build();
static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
.name("Job Name")
.description("The name of the exporting job")
.defaultValue("nifi_reporting_job")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("Send JVM Metrics")
.description("Send JVM Metrics in addition to the NiFi-metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
.name("Log Analytics URL Endpoint Format")
.description("Log Analytics URL Endpoint Format")
.required(false)
.defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
try {
// Documentation: https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, rfc1123Date);
Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
return String.format("SharedKey %s:%s", workspaceId, hmac);
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}
.name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
.defaultValue("nifimetrics").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -168,31 +69,34 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
properties.add(INSTANCE_ID);
properties.add(PROCESS_GROUP_IDS);
properties.add(JOB_NAME);
properties.add(SEND_JVM_METRICS);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
return properties;
}
@Override
public void onTrigger(final ReportingContext context){
final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
public void onTrigger(final ReportingContext context) {
final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions()
.getValue();
final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions()
.getValue();
final boolean jvmMetricsCollected = context.getProperty(SEND_JVM_METRICS).asBoolean();
final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
.getValue();
final String instanceId = context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
final String groupIds = context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
.evaluateAttributeExpressions().getValue();
try {
List<Metric> allMetrics = null;
if(groupIds == null || groupIds.isEmpty()) {
if (groupIds == null || groupIds.isEmpty()) {
ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
String processGroupName = status.getName();
allMetrics = collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected);
} else {
allMetrics = new ArrayList<>();
for(String groupId: groupIds.split(",")) {
groupId =groupId.trim();
for (String groupId : groupIds.split(",")) {
groupId = groupId.trim();
ProcessGroupStatus status = context.getEventAccess().getGroupStatus(groupId);
String processGroupName = status.getName();
allMetrics.addAll(collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected));
@ -205,36 +109,22 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
}
}
/**
* Construct HttpPost and return it
* @param urlFormat URL format to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
* @param logName log table name where metrics will be pushed
* @return HttpsURLConnection to your azure log analytics workspace
* @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
*/
protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
throws IllegalArgumentException {
String dataCollectorEndpoint =
MessageFormat.format(urlFormat, workspaceId);
HttpPost post = new HttpPost(dataCollectorEndpoint);
post.addHeader("Content-Type", "application/json");
post.addHeader("Log-Type", logName);
return post;
}
/**
* send collected metrics to azure log analytics workspace
* @param request HttpPost to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
*
* @param request HttpPost to Azure Log Analytics Endpoint
* @param workspaceId your azure log analytics workspace id
* @param linuxPrimaryKey your azure log analytics workspace key
* @param allMetrics collected metrics to be sent
* @throws IOException when there is an error in https url connection or read/write to the onnection
* @throws IllegalArgumentException when there a exception in converting metrics to json string with Gson.toJson
* @throws RuntimeException when httpPost fails with none 200 status code
* @param allMetrics collected metrics to be sent
* @throws IOException when there is an error in https url
* connection or read/write to the onnection
* @throws IllegalArgumentException when there a exception in converting metrics
* to json string with Gson.toJson
* @throws RuntimeException when httpPost fails with none 200 status
* code
*/
protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
throws IOException, IllegalArgumentException, RuntimeException {
protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
final List<Metric> allMetrics) throws IOException, IllegalArgumentException, RuntimeException {
Gson gson = new GsonBuilder().create();
StringBuilder builder = new StringBuilder();
builder.append('[');
@ -243,45 +133,20 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
builder.append(',');
}
builder.append(']');
final String rawJson = builder.toString();
final int bodyLength = rawJson.getBytes(UTF8).length;
final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
request.addHeader("Authorization", createAuthorization);
request.addHeader("x-ms-date", nowRfc1123);
request.setEntity(new StringEntity(rawJson));
try(CloseableHttpClient httpClient = HttpClients.createDefault()){
postRequest(httpClient, request);
}
sendToLogAnalytics(request, workspaceId, linuxPrimaryKey, builder.toString());
}
/**
* post request with httpClient and httpPost
* @param httpClient HttpClient
* @param request HttpPost
* @throws IOException if httpClient.execute fails
* @throws RuntimeException if post request status return other than 200
*/
protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
throws IOException, RuntimeException {
try (CloseableHttpResponse response = httpClient.execute(request)) {
if(response != null && response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(response.getStatusLine().toString());
}
}
}
/**
* collect metrics to be sent to azure log analytics workspace
* @param instanceId instance id
* @param status process group status
* @param processGroupName process group name
*
* @param instanceId instance id
* @param status process group status
* @param processGroupName process group name
* @param jvmMetricsCollected whether we want to collect jvm metrics or not
* @return list of metrics collected
*/
private List<Metric> collectMetrics(final String instanceId,
final ProcessGroupStatus status, final String processGroupName, final boolean jvmMetricsCollected) {
protected List<Metric> collectMetrics(final String instanceId, final ProcessGroupStatus status,
final String processGroupName, final boolean jvmMetricsCollected) {
List<Metric> allMetrics = new ArrayList<>();
// dataflow process group level metrics
@ -290,9 +155,9 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
// connections process group level metrics
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(status, connectionStatuses);
for (ConnectionStatus connectionStatus: connectionStatuses) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId, processGroupName));
for (ConnectionStatus connectionStatus : connectionStatuses) {
allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId,
processGroupName));
}
// processor level metrics
@ -300,13 +165,12 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
populateProcessorStatuses(status, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName)
);
AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName));
}
if (jvmMetricsCollected) {
allMetrics.addAll(
AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
}
return allMetrics;
@ -319,7 +183,8 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
}
}
private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
private void populateConnectionStatuses(final ProcessGroupStatus groupStatus,
final List<ConnectionStatus> statuses) {
statuses.addAll(groupStatus.getConnectionStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateConnectionStatuses(childGroupStatus, statuses);

View File

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsProvenanceReportingTask

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.azure.loganalytics;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.json.Json;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObjectBuilder;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Test;
public class TestAzureLogAnalyticsProvenanceReportingTask {
@Test
public void testAddField1() throws IOException, InterruptedException, InitializationException {
final Map<String, Object> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyString", "StringValue", true);
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyInteger", 2674440, true);
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyLong", 1289904147324L, true);
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyBoolean", true, true);
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNotSupportedObject", 1.25, true);
AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNull", null, true);
javax.json.JsonObject actualJson = builder.build();
String expectedjsonString = "{" +
"\"TestKeyString\": \"StringValue\"," +
"\"TestKeyInteger\": 2674440," +
"\"TestKeyLong\": 1289904147324," +
"\"TestKeyBoolean\": true," +
"\"TestKeyNotSupportedObject\": \"1.25\"," +
"\"TestKeyNull\": null" +
"}";
JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
assertEquals(expectedJson.toString(), actualJson.toString());
}
@Test
public void testAddField2() throws IOException, InterruptedException, InitializationException {
final Map<String, Object> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
Map<String, String> values = new HashMap<String, String>();
values.put("TestKeyString1", "StringValue1");
values.put("TestKeyString2", "StringValue2");
AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
javax.json.JsonObject actualJson = builder.build();
String expectedjsonString = "{\"TestKeyString\":{\"TestKeyString2\":\"StringValue2\",\"TestKeyString1\":\"StringValue1\"}}";
JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
assertEquals(expectedJson.toString(), actualJson.toString());
}
@Test
public void testAddField3() throws IOException, InterruptedException, InitializationException {
final Map<String, Object> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
final JsonObjectBuilder builder = factory.createObjectBuilder();
Collection<String> values = new ArrayList<String>();
values.add("TestValueString1");
values.add("TestValueString2");
AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
javax.json.JsonObject actualJson = builder.build();
String expectedjsonString = "{\"TestKeyString\":[\"TestValueString1\",\"TestValueString2\"]}";
JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
assertEquals(expectedJson.toString(), actualJson.toString());
}
}