NIFI-6429 - Added optional field to allow variables with null values to be included in reporting records. Default functionality will exclude variables with null values for backwards compatibility.

NIFI-6429 - checkstyle corrections
NIFI-6429 - change property descriptor name
This commit is contained in:
Yolanda Davis 2019-07-17 16:07:01 -04:00 committed by Andy I. Christianson
parent 5df6b0edbb
commit 07baf723f7
12 changed files with 576 additions and 315 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@ -187,8 +188,18 @@ public class MetricsService {
return metrics;
}
private boolean addEmptyValue(Map<String,?>metricsMap, String metricskey, JsonObjectBuilder objectBuilder, boolean allowNullValues){
if(metricsMap.get(metricskey) == null){
if(allowNullValues) {
objectBuilder.add(metricskey, JsonValue.NULL);
}
return true;
}
return false;
}
public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, JvmMetrics virtualMachineMetrics,
String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) {
String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad, boolean allowNullValues) {
JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
.add(MetricFields.APP_ID, applicationId)
.add(MetricFields.HOSTNAME, hostname)
@ -201,27 +212,38 @@ public class MetricsService {
Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
for (String key : integerMetrics.keySet()) {
objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
if(!addEmptyValue(integerMetrics,key,objectBuilder,allowNullValues)) {
objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
}
}
Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
for (String key : longMetrics.keySet()) {
objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
if(!addEmptyValue(longMetrics,key,objectBuilder,allowNullValues)) {
objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
}
}
Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
for (String key : doubleMetrics.keySet()) {
objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
if(!addEmptyValue(doubleMetrics,key,objectBuilder,allowNullValues)){
objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
}
}
Map<String,Long> longPgMetrics = getLongMetrics(status, false);
for (String key : longPgMetrics.keySet()) {
objectBuilder.add(key, longPgMetrics.get(key));
if(!addEmptyValue(longPgMetrics,key,objectBuilder,allowNullValues)) {
objectBuilder.add(key, longPgMetrics.get(key));
}
}
Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false);
for (String key : integerPgMetrics.keySet()) {
objectBuilder.add(key, integerPgMetrics.get(key));
if(!addEmptyValue(integerPgMetrics,key,objectBuilder,allowNullValues)) {
objectBuilder.add(key, integerPgMetrics.get(key));
}
}
return objectBuilder.build();

View File

@ -18,6 +18,8 @@ package org.apache.nifi.reporting.util.metrics.api;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
/**
* Builds the JsonObject for an individual metric.
@ -67,7 +69,11 @@ public class MetricBuilder {
return this;
}
public JsonObject build() {
public JsonObject build(final boolean allowNullValues) {
JsonObjectBuilder metricValueBuilder = this.metricValue == null && allowNullValues? factory.createObjectBuilder()
.add(String.valueOf(timestamp), JsonValue.NULL): factory.createObjectBuilder()
.add(String.valueOf(timestamp), this.metricValue);
return factory.createObjectBuilder()
.add(MetricFields.METRIC_NAME, metricName)
.add(MetricFields.APP_ID, applicationId)
@ -75,10 +81,12 @@ public class MetricBuilder {
.add(MetricFields.HOSTNAME, hostname)
.add(MetricFields.TIMESTAMP, timestamp)
.add(MetricFields.START_TIME, timestamp)
.add(MetricFields.METRICS,
factory.createObjectBuilder()
.add(String.valueOf(timestamp), metricValue)
).build();
.add(MetricFields.METRICS, metricValueBuilder)
.build();
}
public JsonObject build() {
return build(false);
}
}

View File

@ -16,12 +16,13 @@
*/
package org.apache.nifi.reporting.util.metrics.api;
import java.util.HashMap;
import java.util.Map;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.util.HashMap;
import java.util.Map;
/**
* Builds the overall JsonObject for the Metrics.
@ -72,7 +73,7 @@ public class MetricsBuilder {
return this;
}
public JsonObject build() {
public JsonObject build(final boolean allowNullValues) {
// builds JsonObject for individual metrics
final MetricBuilder metricBuilder = new MetricBuilder(factory);
metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
@ -81,7 +82,7 @@ public class MetricsBuilder {
for (Map.Entry<String,String> entry : metrics.entrySet()) {
metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
metricArrayBuilder.add(metricBuilder.build());
metricArrayBuilder.add(metricBuilder.build(allowNullValues));
}
// add the array of metrics to a top-level json object
@ -90,4 +91,7 @@ public class MetricsBuilder {
return metricsBuilder.build();
}
public JsonObject build() {
return build(false);
}
}

View File

@ -195,6 +195,15 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
.required(false)
.build();
static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder()
.name("include-null-values")
.displayName("Include Null Values")
.description("Indicate if null values should be included in records. Default will be false")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
protected volatile SiteToSiteClient siteToSiteClient;
protected volatile RecordSchema recordSchema;
@ -214,6 +223,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
properties.add(HTTP_PROXY_USERNAME);
properties.add(HTTP_PROXY_PASSWORD);
properties.add(RECORD_WRITER);
properties.add(ALLOW_NULL_VALUES);
return properties;
}
@ -327,33 +337,35 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final Long value) {
protected void addField(final JsonObjectBuilder builder, final String key, final Boolean value, final boolean allowNullValues) {
if (value != null) {
builder.add(key, value.longValue());
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
if (value != null) {
builder.add(key, value.intValue());
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final String value) {
if (value == null) {
return;
}
builder.add(key, value);
}
protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
if (value == null) {
if (allowNullValues) {
builder.add(key, JsonValue.NULL);
}
} else {
builder.add(key, value);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final Long value, boolean allowNullValues) {
if (value != null) {
builder.add(key, value);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final Integer value, boolean allowNullValues) {
if (value != null) {
builder.add(key, value);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}
protected void addField(final JsonObjectBuilder builder, final String key, final String value, boolean allowNullValues) {
if (value != null) {
builder.add(key, value);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}

View File

@ -17,6 +17,27 @@
package org.apache.nifi.reporting;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
@ -33,27 +54,6 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.scheduling.SchedulingStrategy;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Tags({"bulletin", "site", "site to site"})
@CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to "
+ "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins "
@ -126,6 +126,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
}
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
@ -140,7 +141,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final Bulletin bulletin : bulletins) {
if(bulletin.getId() > lastSentBulletinId) {
arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId));
arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId, allowNullValues));
}
}
final JsonArray jsonArray = arrayBuilder.build();
@ -176,22 +177,22 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
}
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
final String platform, final String nodeIdentifier) {
final String platform, final String nodeIdentifier, Boolean allowNullValues) {
addField(builder, "objectId", UUID.randomUUID().toString());
addField(builder, "platform", platform);
addField(builder, "bulletinId", bulletin.getId());
addField(builder, "bulletinCategory", bulletin.getCategory());
addField(builder, "bulletinGroupId", bulletin.getGroupId());
addField(builder, "bulletinGroupName", bulletin.getGroupName());
addField(builder, "bulletinLevel", bulletin.getLevel());
addField(builder, "bulletinMessage", bulletin.getMessage());
addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress());
addField(builder, "bulletinNodeId", nodeIdentifier);
addField(builder, "bulletinSourceId", bulletin.getSourceId());
addField(builder, "bulletinSourceName", bulletin.getSourceName());
addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name());
addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()));
addField(builder, "objectId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "bulletinId", bulletin.getId(), allowNullValues);
addField(builder, "bulletinCategory", bulletin.getCategory(), allowNullValues);
addField(builder, "bulletinGroupId", bulletin.getGroupId(), allowNullValues);
addField(builder, "bulletinGroupName", bulletin.getGroupName(), allowNullValues);
addField(builder, "bulletinLevel", bulletin.getLevel(), allowNullValues);
addField(builder, "bulletinMessage", bulletin.getMessage(), allowNullValues);
addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress(), allowNullValues);
addField(builder, "bulletinNodeId", nodeIdentifier, allowNullValues);
addField(builder, "bulletinSourceId", bulletin.getSourceId(), allowNullValues);
addField(builder, "bulletinSourceName", bulletin.getSourceName(), allowNullValues);
addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), allowNullValues);
addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()), allowNullValues);
return builder.build();
}

View File

@ -157,6 +157,7 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
if(status != null) {
final Map<String,String> statusMetrics = metricsService.getMetrics(status, false);
@ -179,13 +180,13 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
.addAllMetrics(jvmMetrics)
.metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors()))
.metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1))
.build();
.build(allowNullValues);
data = metricsObject.toString().getBytes(StandardCharsets.UTF_8);
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
} else {
final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(),
hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1);
hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1, allowNullValues);
data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes);
}

View File

@ -17,6 +17,31 @@
package org.apache.nifi.reporting;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
@ -42,29 +67,6 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@ -289,6 +291,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final String hostname = url.getHost();
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
@ -305,7 +308,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final String componentName = mapHolder.getComponentName(event.getComponentId());
final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType());
final String processGroupName = mapHolder.getComponentName(processGroupId);
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId));
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId, allowNullValues));
}
final JsonArray jsonArray = arrayBuilder.build();
@ -341,29 +344,29 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
final String platform, final String nodeIdentifier) {
addField(builder, "eventId", UUID.randomUUID().toString());
addField(builder, "eventOrdinal", event.getEventId());
addField(builder, "eventType", event.getEventType().name());
addField(builder, "timestampMillis", event.getEventTime());
addField(builder, "timestamp", df.format(event.getEventTime()));
addField(builder, "durationMillis", event.getEventDuration());
addField(builder, "lineageStart", event.getLineageStartDate());
addField(builder, "details", event.getDetails());
addField(builder, "componentId", event.getComponentId());
addField(builder, "componentType", event.getComponentType());
addField(builder, "componentName", componentName);
addField(builder, "processGroupId", processGroupId, true);
addField(builder, "processGroupName", processGroupName, true);
addField(builder, "entityId", event.getFlowFileUuid());
addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile");
addField(builder, "entitySize", event.getFileSize());
addField(builder, "previousEntitySize", event.getPreviousFileSize());
addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes());
addField(builder, factory, "previousAttributes", event.getPreviousAttributes());
final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
final String platform, final String nodeIdentifier, Boolean allowNullValues) {
addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "eventOrdinal", event.getEventId(), allowNullValues);
addField(builder, "eventType", event.getEventType().name(), allowNullValues);
addField(builder, "timestampMillis", event.getEventTime(), allowNullValues);
addField(builder, "timestamp", df.format(event.getEventTime()), allowNullValues);
addField(builder, "durationMillis", event.getEventDuration(), allowNullValues);
addField(builder, "lineageStart", event.getLineageStartDate(), allowNullValues);
addField(builder, "details", event.getDetails(), allowNullValues);
addField(builder, "componentId", event.getComponentId(), allowNullValues);
addField(builder, "componentType", event.getComponentType(), allowNullValues);
addField(builder, "componentName", componentName, allowNullValues);
addField(builder, "processGroupId", processGroupId, allowNullValues);
addField(builder, "processGroupName", processGroupName, allowNullValues);
addField(builder, "entityId", event.getFlowFileUuid(), allowNullValues);
addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", allowNullValues);
addField(builder, "entitySize", event.getFileSize(), allowNullValues);
addField(builder, "previousEntitySize", event.getPreviousFileSize(), allowNullValues);
addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues);
addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues);
addField(builder, "actorHostname", hostname);
addField(builder, "actorHostname", hostname, allowNullValues);
if (nifiUrl != null) {
// TO get URL Prefix, we just remove the /nifi from the end of the URL. We know that the URL ends with
// "/nifi" because the Property Validator enforces it
@ -372,44 +375,51 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId() + "/content/";
final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier;
addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix);
addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix);
addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix, allowNullValues);
addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix, allowNullValues);
}
addField(builder, factory, "parentIds", event.getParentUuids());
addField(builder, factory, "childIds", event.getChildUuids());
addField(builder, "transitUri", event.getTransitUri());
addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier());
addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri());
addField(builder, "platform", platform);
addField(builder, "application", applicationName);
addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues);
addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues);
addField(builder, "transitUri", event.getTransitUri(), allowNullValues);
addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), allowNullValues);
addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "application", applicationName, allowNullValues);
return builder.build();
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) {
if (values == null) {
return;
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values, Boolean allowNullValues) {
if (values != null) {
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, String> entry : values.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, String> entry : values.entrySet()) {
if (entry.getKey() == null ) {
continue;
}else if(entry.getValue() == null ){
if(allowNullValues) {
mapBuilder.add(entry.getKey(), JsonValue.NULL);
}
}else {
mapBuilder.add(entry.getKey(), entry.getValue());
}
}
mapBuilder.add(entry.getKey(), entry.getValue());
}
builder.add(key, mapBuilder);
builder.add(key, mapBuilder);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}
private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
if (values == null) {
return;
private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values, Boolean allowNullValues) {
if (values != null) {
builder.add(key, createJsonArray(factory, values));
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
builder.add(key, createJsonArray(factory, values));
}
private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {

View File

@ -135,6 +135,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
final String hostname = url.getHost();
final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
final Map<String, ?> config = Collections.emptyMap();
final JsonBuilderFactory factory = Json.createBuilderFactory(config);
@ -144,7 +145,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName,
platform, null, new Date());
platform, null, new Date(), allowNullValues);
final JsonArray jsonArray = arrayBuilder.build();
@ -227,182 +228,186 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
* The configured platform
* @param parentId
* The parent's component id
* @param currentDate
* The current date
* @param allowNullValues
* Allow null values
*/
private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final ProcessGroupStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId());
addField(builder, "bytesRead", status.getBytesRead());
addField(builder, "bytesWritten", status.getBytesWritten());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "bytesTransferred", status.getBytesTransferred());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred());
addField(builder, "inputContentSize", status.getInputContentSize());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputContentSize", status.getOutputContentSize());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "queuedContentSize", status.getQueuedContentSize());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
addField(builder, "queuedCount", status.getQueuedCount());
addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name());
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "bytesRead", status.getBytesRead(), allowNullValues);
addField(builder, "bytesWritten", status.getBytesWritten(), allowNullValues);
addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues);
addField(builder, "bytesSent", status.getBytesSent(), allowNullValues);
addField(builder, "bytesTransferred", status.getBytesTransferred(), allowNullValues);
addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues);
addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues);
addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred(), allowNullValues);
addField(builder, "inputContentSize", status.getInputContentSize(), allowNullValues);
addField(builder, "inputCount", status.getInputCount(), allowNullValues);
addField(builder, "outputContentSize", status.getOutputContentSize(), allowNullValues);
addField(builder, "outputCount", status.getOutputCount(), allowNullValues);
addField(builder, "queuedContentSize", status.getQueuedContentSize(), allowNullValues);
addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues);
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount(), allowNullValues);
addField(builder, "queuedCount", status.getQueuedCount(), allowNullValues);
addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name(), allowNullValues);
arrayBuilder.add(builder.build());
}
for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
applicationName, platform, status.getId(), currentDate, allowNullValues);
}
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
applicationName, platform, status.getId(), currentDate, allowNullValues);
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
applicationName, platform, status.getId(), currentDate, allowNullValues);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate);
hostname, applicationName, platform, status.getId(), currentDate, allowNullValues);
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate);
hostname, applicationName, platform, status.getId(), currentDate, allowNullValues);
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate);
applicationName, platform, status.getId(), currentDate, allowNullValues);
}
}
private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
final String platform, final String parentId, final Date currentDate) {
final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "RemoteProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId());
addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount());
addField(builder, "receivedContentSize", status.getReceivedContentSize());
addField(builder, "receivedCount", status.getReceivedCount());
addField(builder, "sentContentSize", status.getSentContentSize());
addField(builder, "sentCount", status.getSentCount());
addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name());
addField(builder, "targetURI", status.getTargetUri());
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount(), allowNullValues);
addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues);
addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount(), allowNullValues);
addField(builder, "receivedContentSize", status.getReceivedContentSize(), allowNullValues);
addField(builder, "receivedCount", status.getReceivedCount(), allowNullValues);
addField(builder, "sentContentSize", status.getSentContentSize(), allowNullValues);
addField(builder, "sentCount", status.getSentCount(), allowNullValues);
addField(builder, "averageLineageDuration", status.getAverageLineageDuration(), allowNullValues);
addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name(), allowNullValues);
addField(builder, "targetURI", status.getTargetUri(), allowNullValues);
arrayBuilder.add(builder.build());
}
}
private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
addField(builder, "transmitting", status.isTransmitting());
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues);
addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues);
addField(builder, "bytesSent", status.getBytesSent(), allowNullValues);
addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues);
addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues);
addField(builder, "inputBytes", status.getInputBytes(), allowNullValues);
addField(builder, "inputCount", status.getInputCount(), allowNullValues);
addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues);
addField(builder, "outputCount", status.getOutputCount(), allowNullValues);
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name(), allowNullValues);
addField(builder, "transmitting", status.isTransmitting(), allowNullValues);
arrayBuilder.add(builder.build());
}
}
private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Connection";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
componentType, componentName);
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId());
addField(builder, "sourceId", status.getSourceId());
addField(builder, "sourceName", status.getSourceName());
addField(builder, "destinationId", status.getDestinationId());
addField(builder, "destinationName", status.getDestinationName());
addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes());
addField(builder, "maxQueuedCount", status.getMaxQueuedCount());
addField(builder, "queuedBytes", status.getQueuedBytes());
addField(builder, "queuedCount", status.getQueuedCount());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold());
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "sourceId", status.getSourceId(), allowNullValues);
addField(builder, "sourceName", status.getSourceName(), allowNullValues);
addField(builder, "destinationId", status.getDestinationId(), allowNullValues);
addField(builder, "destinationName", status.getDestinationName(), allowNullValues);
addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes(), allowNullValues);
addField(builder, "maxQueuedCount", status.getMaxQueuedCount(), allowNullValues);
addField(builder, "queuedBytes", status.getQueuedBytes(), allowNullValues);
addField(builder, "queuedCount", status.getQueuedCount(), allowNullValues);
addField(builder, "inputBytes", status.getInputBytes(), allowNullValues);
addField(builder, "inputCount", status.getInputCount(), allowNullValues);
addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues);
addField(builder, "outputCount", status.getOutputCount(), allowNullValues);
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold(), allowNullValues);
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold(), allowNullValues);
addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold(), allowNullValues);
addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())), allowNullValues);
arrayBuilder.add(builder.build());
}
}
private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Processor";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName);
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId());
addField(builder, "processorType", status.getType());
addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration());
addField(builder, "bytesRead", status.getBytesRead());
addField(builder, "bytesWritten", status.getBytesWritten());
addField(builder, "bytesReceived", status.getBytesReceived());
addField(builder, "bytesSent", status.getBytesSent());
addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved());
addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
addField(builder, "flowFilesSent", status.getFlowFilesSent());
addField(builder, "inputCount", status.getInputCount());
addField(builder, "inputBytes", status.getInputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "activeThreadCount", status.getActiveThreadCount());
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount());
addField(builder, "invocations", status.getInvocations());
addField(builder, "processingNanos", status.getProcessingNanos());
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name());
addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name());
addField(builder, factory, "counters", status.getCounters());
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "processorType", status.getType(), allowNullValues);
addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration(), allowNullValues);
addField(builder, "bytesRead", status.getBytesRead(), allowNullValues);
addField(builder, "bytesWritten", status.getBytesWritten(), allowNullValues);
addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues);
addField(builder, "bytesSent", status.getBytesSent(), allowNullValues);
addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved(), allowNullValues);
addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues);
addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues);
addField(builder, "inputCount", status.getInputCount(), allowNullValues);
addField(builder, "inputBytes", status.getInputBytes(), allowNullValues);
addField(builder, "outputCount", status.getOutputCount(), allowNullValues);
addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues);
addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues);
addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount(), allowNullValues);
addField(builder, "invocations", status.getInvocations(), allowNullValues);
addField(builder, "processingNanos", status.getProcessingNanos(), allowNullValues);
addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name(), allowNullValues);
addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name(), allowNullValues);
addField(builder, factory, "counters", status.getCounters(), allowNullValues);
arrayBuilder.add(builder.build());
}
@ -410,40 +415,40 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
final String applicationName, final String platform, final String parentId, final Date currentDate,
final String componentType, final String componentName) {
addField(builder, "statusId", UUID.randomUUID().toString());
addField(builder, "timestampMillis", currentDate.getTime());
addField(builder, "timestamp", df.format(currentDate));
addField(builder, "actorHostname", hostname);
addField(builder, "componentType", componentType);
addField(builder, "componentName", componentName);
addField(builder, "parentId", parentId);
addField(builder, "platform", platform);
addField(builder, "application", applicationName);
final String componentType, final String componentName, Boolean allowNullValues) {
addField(builder, "statusId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "timestampMillis", currentDate.getTime(), allowNullValues);
addField(builder, "timestamp", df.format(currentDate), allowNullValues);
addField(builder, "actorHostname", hostname, allowNullValues);
addField(builder, "componentType", componentType, allowNullValues);
addField(builder, "componentName", componentName, allowNullValues);
addField(builder, "parentId", parentId, allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "application", applicationName, allowNullValues);
}
private void addField(final JsonObjectBuilder builder, final String key, final Boolean value) {
if (value == null) {
return;
}
builder.add(key, value);
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, Long> values, final Boolean allowNullValues) {
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, Long> values) {
if (values == null) {
return;
}
if (values != null) {
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, Long> entry : values.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
for (final Map.Entry<String, Long> entry : values.entrySet()) {
if (entry.getKey() == null ) {
continue;
}else if(entry.getValue() == null ){
if(allowNullValues)
mapBuilder.add(entry.getKey(),JsonValue.NULL);
}else{
mapBuilder.add(entry.getKey(), entry.getValue());
}
}
mapBuilder.add(entry.getKey(), entry.getValue());
}
builder.add(key, mapBuilder);
builder.add(key, mapBuilder);
}else if(allowNullValues){
builder.add(key,JsonValue.NULL);
}
}
}

View File

@ -17,6 +17,25 @@
package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -33,22 +52,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestSiteToSiteBulletinReportingTask {
@Test
@ -120,6 +123,54 @@ public class TestSiteToSiteBulletinReportingTask {
JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0);
assertEquals("message", bulletinJson.getString("bulletinMessage"));
assertEquals("group-name", bulletinJson.getString("bulletinGroupName"));
assertNull( bulletinJson.get("bulletinSourceType"));
}
@Test
public void testSerializedFormWithNullValues() throws IOException, InitializationException {
// creating the list of bulletins
final List<Bulletin> bulletins = new ArrayList<Bulletin>();
bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", "source-name", "category", "severity", "message"));
// mock the access to the list of bulletins
final ReportingContext context = Mockito.mock(ReportingContext.class);
final BulletinRepository repository = Mockito.mock(BulletinRepository.class);
Mockito.when(context.getBulletinRepository()).thenReturn(repository);
Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
// creating reporting task
final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask();
// settings properties and mocking access to properties
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
Mockito.doAnswer(new Answer<PropertyValue>() {
@Override
public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
}
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
// setup the mock initialization context
final ComponentLog logger = Mockito.mock(ComponentLog.class);
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
task.initialize(initContext);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0);
assertEquals(JsonValue.NULL, bulletinJson.get("bulletinSourceType"));
}
private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {

View File

@ -74,7 +74,7 @@ public class TestSiteToSiteMetricsReportingTask {
status.setBytesSent(20000);
status.setQueuedCount(100);
status.setQueuedContentSize(1024L);
status.setBytesRead(60000L);
status.setBytesRead(null);
status.setBytesWritten(80000L);
status.setActiveThreadCount(5);
@ -241,6 +241,34 @@ public class TestSiteToSiteMetricsReportingTask {
fail();
}
@Test
public void testAmbariFormatWithNullValues() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
properties.put(SiteToSiteMetricsReportingTask.ALLOW_NULL_VALUES, "true");
MockSiteToSiteMetricsReportingTask task = initTask(properties);
task.onTrigger(context);
assertEquals(1, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonArray array = jsonReader.readObject().getJsonArray("metrics");
for(int i = 0; i < array.size(); i++) {
JsonObject object = array.getJsonObject(i);
assertEquals("nifi", object.getString("appid"));
assertEquals("1234", object.getString("instanceid"));
if(object.getString("metricname").equals("BytesReadLast5Minutes")) {
for(Entry<String, JsonValue> kv : object.getJsonObject("metrics").entrySet()) {
assertEquals("\"null\"", kv.getValue().toString());
}
return;
}
}
fail();
}
@Test
public void testRecordFormat() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();

View File

@ -18,8 +18,26 @@
package org.apache.nifi.reporting;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
@ -46,22 +64,8 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class TestSiteToSiteProvenanceReportingTask {
@ -210,10 +214,38 @@ public class TestSiteToSiteProvenanceReportingTask {
assertEquals(3, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes");
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue details = object.get("details");
JsonObject msgArray = object.getJsonObject("updatedAttributes");
assertNull(details);
assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc"));
assertNull(msgArray.get("emptyVal"));
}
@Test
public void testSerializedFormWithNullValues() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
ProvenanceEventRecord event = createProvenanceEventRecord();
MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
task.initialize(initContext);
task.onScheduled(confContext);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue details = object.get("details");
JsonValue emptyVal = object.getJsonObject("updatedAttributes").get("emptyVal");
assertEquals(JsonValue.NULL,details);
assertEquals(JsonValue.NULL,emptyVal);
}
@Test
public void testFilterComponentIdSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
@ -618,6 +650,7 @@ public class TestSiteToSiteProvenanceReportingTask {
attributes.put("abc", "xyz");
attributes.put("xyz", "abc");
attributes.put("filename", "file-" + uuid);
attributes.put("emptyVal",null);
final Map<String, String> prevAttrs = new HashMap<>();
attributes.put("filename", "1234.xyz");

View File

@ -21,6 +21,7 @@ package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -37,6 +38,7 @@ import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonString;
import javax.json.JsonValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@ -165,8 +167,30 @@ public class TestSiteToSiteStatusReportingTask {
JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold");
assertEquals("1 KB", dataSizeThreshold.getString());
assertEquals(1024, bytesThreshold.intValue());
assertNull(object.get("destinationName"));
}
@Test
public void testConnectionStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue destination = object.get("destinationName");
assertEquals(destination, JsonValue.NULL);
}
@Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
@ -207,12 +231,13 @@ public class TestSiteToSiteStatusReportingTask {
@Test
public void testPortStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"false");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
@ -226,6 +251,26 @@ public class TestSiteToSiteStatusReportingTask {
assertFalse(isTransmitting);
JsonNumber inputBytes = object.getJsonNumber("inputBytes");
assertEquals(5, inputBytes.intValue());
assertNull(object.get("activeThreadCount"));
}
@Test
public void testPortStatusWithNullValues() throws IOException, InitializationException {
ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue activeThreadCount = object.get("activeThreadCount");
assertEquals(activeThreadCount, JsonValue.NULL);
}
@Test
@ -250,6 +295,27 @@ public class TestSiteToSiteStatusReportingTask {
assertEquals("Transmitting", transmissionStatus.getString());
}
@Test
public void testRemoteProcessGroupStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
assertEquals(3, task.dataSent.size());
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject firstElement = jsonReader.readArray().getJsonObject(0);
JsonValue targetURI = firstElement.get("targetURI");
assertEquals(targetURI, JsonValue.NULL);
}
@Test
public void testProcessorStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
@ -273,6 +339,27 @@ public class TestSiteToSiteStatusReportingTask {
assertNotNull(counterMap);
assertEquals(10, counterMap.getInt("counter1"));
assertEquals(5, counterMap.getInt("counter2"));
assertNull(object.get("processorType"));
}
@Test
public void testProcessorStatusWithNullValues() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)");
properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true");
MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonValue type = object.get("processorType");
assertEquals(type, JsonValue.NULL);
}
/***********************************
@ -319,7 +406,6 @@ public class TestSiteToSiteStatusReportingTask {
pgStatus.setRemoteProcessGroupStatus(rpgStatus);
pgStatus.setProcessorStatus(pStatus);
pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE);
pgStatus.setActiveThreadCount(1);
pgStatus.setBytesRead(2L);
pgStatus.setBytesReceived(3l);
@ -345,7 +431,7 @@ public class TestSiteToSiteStatusReportingTask {
PortStatus pStatus = new PortStatus();
pStatus.setId(id);
pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString());
pStatus.setActiveThreadCount(0);
pStatus.setActiveThreadCount(null);
pStatus.setBytesReceived(1l);
pStatus.setBytesSent(2l);
pStatus.setFlowFilesReceived(3);
@ -379,7 +465,7 @@ public class TestSiteToSiteStatusReportingTask {
pStatus.setOutputBytes(12l);
pStatus.setOutputCount(13);
pStatus.setProcessingNanos(14l);
pStatus.setType("type");
pStatus.setType(null);
pStatus.setTerminatedThreadCount(1);
pStatus.setRunStatus(RunStatus.Running);
pStatus.setCounters(new HashMap<String, Long>() {{
@ -402,7 +488,7 @@ public class TestSiteToSiteStatusReportingTask {
rpgStatus.setReceivedCount(5);
rpgStatus.setSentContentSize(6l);
rpgStatus.setSentCount(7);
rpgStatus.setTargetUri("uri");
rpgStatus.setTargetUri(null);
rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
return rpgStatus;
@ -425,7 +511,7 @@ public class TestSiteToSiteStatusReportingTask {
cStatus.setSourceId(id);
cStatus.setSourceName("source");
cStatus.setDestinationId(id);
cStatus.setDestinationName("destination");
cStatus.setDestinationName(null);
return cStatus;
}