diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java index 593d329b92..44ae480b46 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; @@ -187,8 +188,18 @@ public class MetricsService { return metrics; } + private boolean addEmptyValue(MapmetricsMap, String metricskey, JsonObjectBuilder objectBuilder, boolean allowNullValues){ + if(metricsMap.get(metricskey) == null){ + if(allowNullValues) { + objectBuilder.add(metricskey, JsonValue.NULL); + } + return true; + } + return false; + } + public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, JvmMetrics virtualMachineMetrics, - String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) { + String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad, boolean allowNullValues) { JsonObjectBuilder objectBuilder = factory.createObjectBuilder() .add(MetricFields.APP_ID, applicationId) .add(MetricFields.HOSTNAME, hostname) @@ -201,27 +212,38 @@ public class MetricsService { Map integerMetrics = getIntegerMetrics(virtualMachineMetrics); for (String key : integerMetrics.keySet()) { - objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key)); + if(!addEmptyValue(integerMetrics,key,objectBuilder,allowNullValues)) { + objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key)); + } } Map longMetrics = getLongMetrics(virtualMachineMetrics); for (String key : longMetrics.keySet()) { - objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key)); + if(!addEmptyValue(longMetrics,key,objectBuilder,allowNullValues)) { + objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key)); + } } Map doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { - objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key)); + if(!addEmptyValue(doubleMetrics,key,objectBuilder,allowNullValues)){ + objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key)); + } } Map longPgMetrics = getLongMetrics(status, false); for (String key : longPgMetrics.keySet()) { - objectBuilder.add(key, longPgMetrics.get(key)); + if(!addEmptyValue(longPgMetrics,key,objectBuilder,allowNullValues)) { + objectBuilder.add(key, longPgMetrics.get(key)); + } } Map integerPgMetrics = getIntegerMetrics(status, false); for (String key : integerPgMetrics.keySet()) { - objectBuilder.add(key, integerPgMetrics.get(key)); + if(!addEmptyValue(integerPgMetrics,key,objectBuilder,allowNullValues)) { + objectBuilder.add(key, integerPgMetrics.get(key)); + } } return objectBuilder.build(); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java index 81fb0219c9..2e48d0c4a4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java @@ -18,6 +18,8 @@ package org.apache.nifi.reporting.util.metrics.api; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; /** * Builds the JsonObject for an individual metric. @@ -67,7 +69,11 @@ public class MetricBuilder { return this; } - public JsonObject build() { + public JsonObject build(final boolean allowNullValues) { + JsonObjectBuilder metricValueBuilder = this.metricValue == null && allowNullValues? factory.createObjectBuilder() + .add(String.valueOf(timestamp), JsonValue.NULL): factory.createObjectBuilder() + .add(String.valueOf(timestamp), this.metricValue); + return factory.createObjectBuilder() .add(MetricFields.METRIC_NAME, metricName) .add(MetricFields.APP_ID, applicationId) @@ -75,10 +81,12 @@ public class MetricBuilder { .add(MetricFields.HOSTNAME, hostname) .add(MetricFields.TIMESTAMP, timestamp) .add(MetricFields.START_TIME, timestamp) - .add(MetricFields.METRICS, - factory.createObjectBuilder() - .add(String.valueOf(timestamp), metricValue) - ).build(); + .add(MetricFields.METRICS, metricValueBuilder) + .build(); + } + + public JsonObject build() { + return build(false); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java index 36947206da..f26248fe67 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java @@ -16,12 +16,13 @@ */ package org.apache.nifi.reporting.util.metrics.api; +import java.util.HashMap; +import java.util.Map; + import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; -import java.util.HashMap; -import java.util.Map; /** * Builds the overall JsonObject for the Metrics. @@ -72,7 +73,7 @@ public class MetricsBuilder { return this; } - public JsonObject build() { + public JsonObject build(final boolean allowNullValues) { // builds JsonObject for individual metrics final MetricBuilder metricBuilder = new MetricBuilder(factory); metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); @@ -81,7 +82,7 @@ public class MetricsBuilder { for (Map.Entry entry : metrics.entrySet()) { metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); - metricArrayBuilder.add(metricBuilder.build()); + metricArrayBuilder.add(metricBuilder.build(allowNullValues)); } // add the array of metrics to a top-level json object @@ -90,4 +91,7 @@ public class MetricsBuilder { return metricsBuilder.build(); } + public JsonObject build() { + return build(false); + } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index a4086cce7e..dce16a2e32 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -195,6 +195,15 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .required(false) .build(); + static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder() + .name("include-null-values") + .displayName("Include Null Values") + .description("Indicate if null values should be included in records. Default will be false") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + protected volatile SiteToSiteClient siteToSiteClient; protected volatile RecordSchema recordSchema; @@ -214,6 +223,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT properties.add(HTTP_PROXY_USERNAME); properties.add(HTTP_PROXY_PASSWORD); properties.add(RECORD_WRITER); + properties.add(ALLOW_NULL_VALUES); return properties; } @@ -327,33 +337,35 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT } } - protected void addField(final JsonObjectBuilder builder, final String key, final Long value) { + protected void addField(final JsonObjectBuilder builder, final String key, final Boolean value, final boolean allowNullValues) { if (value != null) { - builder.add(key, value.longValue()); - } - } - - protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) { - if (value != null) { - builder.add(key, value.intValue()); - } - } - - protected void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - - builder.add(key, value); - } - - protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { - if (value == null) { - if (allowNullValues) { - builder.add(key, JsonValue.NULL); - } - } else { builder.add(key, value); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final Long value, boolean allowNullValues) { + if (value != null) { + builder.add(key, value); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final Integer value, boolean allowNullValues) { + if (value != null) { + builder.add(key, value); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value, boolean allowNullValues) { + if (value != null) { + builder.add(key, value); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index ac60d8a092..404150b3ab 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -17,6 +17,27 @@ package org.apache.nifi.reporting; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; + import org.apache.avro.Schema; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; @@ -33,27 +54,6 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.scheduling.SchedulingStrategy; -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; - -import java.io.IOException; -import java.io.InputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - @Tags({"bulletin", "site", "site to site"}) @CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to " + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins " @@ -126,6 +126,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting } final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean(); final Map config = Collections.emptyMap(); final JsonBuilderFactory factory = Json.createBuilderFactory(config); @@ -140,7 +141,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); for (final Bulletin bulletin : bulletins) { if(bulletin.getId() > lastSentBulletinId) { - arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId)); + arrayBuilder.add(serialize(factory, builder, bulletin, df, platform, nodeId, allowNullValues)); } } final JsonArray jsonArray = arrayBuilder.build(); @@ -176,22 +177,22 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting } private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, - final String platform, final String nodeIdentifier) { + final String platform, final String nodeIdentifier, Boolean allowNullValues) { - addField(builder, "objectId", UUID.randomUUID().toString()); - addField(builder, "platform", platform); - addField(builder, "bulletinId", bulletin.getId()); - addField(builder, "bulletinCategory", bulletin.getCategory()); - addField(builder, "bulletinGroupId", bulletin.getGroupId()); - addField(builder, "bulletinGroupName", bulletin.getGroupName()); - addField(builder, "bulletinLevel", bulletin.getLevel()); - addField(builder, "bulletinMessage", bulletin.getMessage()); - addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress()); - addField(builder, "bulletinNodeId", nodeIdentifier); - addField(builder, "bulletinSourceId", bulletin.getSourceId()); - addField(builder, "bulletinSourceName", bulletin.getSourceName()); - addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name()); - addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp())); + addField(builder, "objectId", UUID.randomUUID().toString(), allowNullValues); + addField(builder, "platform", platform, allowNullValues); + addField(builder, "bulletinId", bulletin.getId(), allowNullValues); + addField(builder, "bulletinCategory", bulletin.getCategory(), allowNullValues); + addField(builder, "bulletinGroupId", bulletin.getGroupId(), allowNullValues); + addField(builder, "bulletinGroupName", bulletin.getGroupName(), allowNullValues); + addField(builder, "bulletinLevel", bulletin.getLevel(), allowNullValues); + addField(builder, "bulletinMessage", bulletin.getMessage(), allowNullValues); + addField(builder, "bulletinNodeAddress", bulletin.getNodeAddress(), allowNullValues); + addField(builder, "bulletinNodeId", nodeIdentifier, allowNullValues); + addField(builder, "bulletinSourceId", bulletin.getSourceId(), allowNullValues); + addField(builder, "bulletinSourceName", bulletin.getSourceName(), allowNullValues); + addField(builder, "bulletinSourceType", bulletin.getSourceType() == null ? null : bulletin.getSourceType().name(), allowNullValues); + addField(builder, "bulletinTimestamp", df.format(bulletin.getTimestamp()), allowNullValues); return builder.build(); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 10534338b8..830a1115e9 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -157,6 +157,7 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue(); final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); + final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean(); if(status != null) { final Map statusMetrics = metricsService.getMetrics(status, false); @@ -179,13 +180,13 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT .addAllMetrics(jvmMetrics) .metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors())) .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1)) - .build(); + .build(allowNullValues); data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); } else { final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(), - hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1); + hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1, allowNullValues); data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 7fcc3b1b15..c7df2264a9 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -17,6 +17,31 @@ package org.apache.nifi.reporting; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; + import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.Restricted; @@ -42,29 +67,6 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - @Tags({"provenance", "lineage", "tracking", "site", "site to site"}) @CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") @Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.") @@ -289,6 +291,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final String hostname = url.getHost(); final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean(); final Map config = Collections.emptyMap(); final JsonBuilderFactory factory = Json.createBuilderFactory(config); @@ -305,7 +308,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final String componentName = mapHolder.getComponentName(event.getComponentId()); final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType()); final String processGroupName = mapHolder.getComponentName(processGroupId); - arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId)); + arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId, allowNullValues)); } final JsonArray jsonArray = arrayBuilder.build(); @@ -341,29 +344,29 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, - final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName, - final String platform, final String nodeIdentifier) { - addField(builder, "eventId", UUID.randomUUID().toString()); - addField(builder, "eventOrdinal", event.getEventId()); - addField(builder, "eventType", event.getEventType().name()); - addField(builder, "timestampMillis", event.getEventTime()); - addField(builder, "timestamp", df.format(event.getEventTime())); - addField(builder, "durationMillis", event.getEventDuration()); - addField(builder, "lineageStart", event.getLineageStartDate()); - addField(builder, "details", event.getDetails()); - addField(builder, "componentId", event.getComponentId()); - addField(builder, "componentType", event.getComponentType()); - addField(builder, "componentName", componentName); - addField(builder, "processGroupId", processGroupId, true); - addField(builder, "processGroupName", processGroupName, true); - addField(builder, "entityId", event.getFlowFileUuid()); - addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile"); - addField(builder, "entitySize", event.getFileSize()); - addField(builder, "previousEntitySize", event.getPreviousFileSize()); - addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes()); - addField(builder, factory, "previousAttributes", event.getPreviousAttributes()); + final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName, + final String platform, final String nodeIdentifier, Boolean allowNullValues) { + addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues); + addField(builder, "eventOrdinal", event.getEventId(), allowNullValues); + addField(builder, "eventType", event.getEventType().name(), allowNullValues); + addField(builder, "timestampMillis", event.getEventTime(), allowNullValues); + addField(builder, "timestamp", df.format(event.getEventTime()), allowNullValues); + addField(builder, "durationMillis", event.getEventDuration(), allowNullValues); + addField(builder, "lineageStart", event.getLineageStartDate(), allowNullValues); + addField(builder, "details", event.getDetails(), allowNullValues); + addField(builder, "componentId", event.getComponentId(), allowNullValues); + addField(builder, "componentType", event.getComponentType(), allowNullValues); + addField(builder, "componentName", componentName, allowNullValues); + addField(builder, "processGroupId", processGroupId, allowNullValues); + addField(builder, "processGroupName", processGroupName, allowNullValues); + addField(builder, "entityId", event.getFlowFileUuid(), allowNullValues); + addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", allowNullValues); + addField(builder, "entitySize", event.getFileSize(), allowNullValues); + addField(builder, "previousEntitySize", event.getPreviousFileSize(), allowNullValues); + addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues); + addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues); - addField(builder, "actorHostname", hostname); + addField(builder, "actorHostname", hostname, allowNullValues); if (nifiUrl != null) { // TO get URL Prefix, we just remove the /nifi from the end of the URL. We know that the URL ends with // "/nifi" because the Property Validator enforces it @@ -372,44 +375,51 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId() + "/content/"; final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier; - addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix); - addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix); + addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix, allowNullValues); + addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix, allowNullValues); } - addField(builder, factory, "parentIds", event.getParentUuids()); - addField(builder, factory, "childIds", event.getChildUuids()); - addField(builder, "transitUri", event.getTransitUri()); - addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier()); - addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri()); - addField(builder, "platform", platform); - addField(builder, "application", applicationName); + addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues); + addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues); + addField(builder, "transitUri", event.getTransitUri(), allowNullValues); + addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), allowNullValues); + addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues); + addField(builder, "platform", platform, allowNullValues); + addField(builder, "application", applicationName, allowNullValues); return builder.build(); } - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values) { - if (values == null) { - return; - } + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values, Boolean allowNullValues) { + if (values != null) { - final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); - for (final Map.Entry entry : values.entrySet()) { - if (entry.getKey() == null || entry.getValue() == null) { - continue; + final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); + for (final Map.Entry entry : values.entrySet()) { + + if (entry.getKey() == null ) { + continue; + }else if(entry.getValue() == null ){ + if(allowNullValues) { + mapBuilder.add(entry.getKey(), JsonValue.NULL); + } + }else { + mapBuilder.add(entry.getKey(), entry.getValue()); + } } - mapBuilder.add(entry.getKey(), entry.getValue()); - } + builder.add(key, mapBuilder); - builder.add(key, mapBuilder); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); + } } - private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection values) { - if (values == null) { - return; + private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection values, Boolean allowNullValues) { + if (values != null) { + builder.add(key, createJsonArray(factory, values)); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); } - - builder.add(key, createJsonArray(factory, values)); } private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection values) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 1c3f81065f..43e82efb2c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -135,6 +135,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa final String hostname = url.getHost(); final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean(); final Map config = Collections.emptyMap(); final JsonBuilderFactory factory = Json.createBuilderFactory(config); @@ -144,7 +145,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName, - platform, null, new Date()); + platform, null, new Date(), allowNullValues); final JsonArray jsonArray = arrayBuilder.build(); @@ -227,182 +228,186 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa * The configured platform * @param parentId * The parent's component id + * @param currentDate + * The current date + * @param allowNullValues + * Allow null values */ private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessGroupStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, - componentType, componentName); + componentType, componentName, allowNullValues); - addField(builder, "componentId", status.getId()); - addField(builder, "bytesRead", status.getBytesRead()); - addField(builder, "bytesWritten", status.getBytesWritten()); - addField(builder, "bytesReceived", status.getBytesReceived()); - addField(builder, "bytesSent", status.getBytesSent()); - addField(builder, "bytesTransferred", status.getBytesTransferred()); - addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); - addField(builder, "flowFilesSent", status.getFlowFilesSent()); - addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred()); - addField(builder, "inputContentSize", status.getInputContentSize()); - addField(builder, "inputCount", status.getInputCount()); - addField(builder, "outputContentSize", status.getOutputContentSize()); - addField(builder, "outputCount", status.getOutputCount()); - addField(builder, "queuedContentSize", status.getQueuedContentSize()); - addField(builder, "activeThreadCount", status.getActiveThreadCount()); - addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount()); - addField(builder, "queuedCount", status.getQueuedCount()); - addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name()); + addField(builder, "componentId", status.getId(), allowNullValues); + addField(builder, "bytesRead", status.getBytesRead(), allowNullValues); + addField(builder, "bytesWritten", status.getBytesWritten(), allowNullValues); + addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues); + addField(builder, "bytesSent", status.getBytesSent(), allowNullValues); + addField(builder, "bytesTransferred", status.getBytesTransferred(), allowNullValues); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues); + addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues); + addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred(), allowNullValues); + addField(builder, "inputContentSize", status.getInputContentSize(), allowNullValues); + addField(builder, "inputCount", status.getInputCount(), allowNullValues); + addField(builder, "outputContentSize", status.getOutputContentSize(), allowNullValues); + addField(builder, "outputCount", status.getOutputCount(), allowNullValues); + addField(builder, "queuedContentSize", status.getQueuedContentSize(), allowNullValues); + addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues); + addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount(), allowNullValues); + addField(builder, "queuedCount", status.getQueuedCount(), allowNullValues); + addField(builder, "versionedFlowState", status.getVersionedFlowState() == null ? null : status.getVersionedFlowState().name(), allowNullValues); arrayBuilder.add(builder.build()); } for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname, - applicationName, platform, status.getId(), currentDate); + applicationName, platform, status.getId(), currentDate, allowNullValues); } for(ProcessorStatus processorStatus : status.getProcessorStatus()) { serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname, - applicationName, platform, status.getId(), currentDate); + applicationName, platform, status.getId(), currentDate, allowNullValues); } for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname, - applicationName, platform, status.getId(), currentDate); + applicationName, platform, status.getId(), currentDate, allowNullValues); } for(PortStatus portStatus : status.getInputPortStatus()) { serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df, - hostname, applicationName, platform, status.getId(), currentDate); + hostname, applicationName, platform, status.getId(), currentDate, allowNullValues); } for(PortStatus portStatus : status.getOutputPortStatus()) { serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df, - hostname, applicationName, platform, status.getId(), currentDate); + hostname, applicationName, platform, status.getId(), currentDate, allowNullValues); } for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname, - applicationName, platform, status.getId(), currentDate); + applicationName, platform, status.getId(), currentDate, allowNullValues); } } private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, - final String platform, final String parentId, final Date currentDate) { + final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "RemoteProcessGroup"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, - componentType, componentName); + componentType, componentName, allowNullValues); - addField(builder, "componentId", status.getId()); - addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount()); - addField(builder, "activeThreadCount", status.getActiveThreadCount()); - addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount()); - addField(builder, "receivedContentSize", status.getReceivedContentSize()); - addField(builder, "receivedCount", status.getReceivedCount()); - addField(builder, "sentContentSize", status.getSentContentSize()); - addField(builder, "sentCount", status.getSentCount()); - addField(builder, "averageLineageDuration", status.getAverageLineageDuration()); - addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name()); - addField(builder, "targetURI", status.getTargetUri()); + addField(builder, "componentId", status.getId(), allowNullValues); + addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount(), allowNullValues); + addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues); + addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount(), allowNullValues); + addField(builder, "receivedContentSize", status.getReceivedContentSize(), allowNullValues); + addField(builder, "receivedCount", status.getReceivedCount(), allowNullValues); + addField(builder, "sentContentSize", status.getSentContentSize(), allowNullValues); + addField(builder, "sentCount", status.getSentCount(), allowNullValues); + addField(builder, "averageLineageDuration", status.getAverageLineageDuration(), allowNullValues); + addField(builder, "transmissionStatus", status.getTransmissionStatus() == null ? null : status.getTransmissionStatus().name(), allowNullValues); + addField(builder, "targetURI", status.getTargetUri(), allowNullValues); arrayBuilder.add(builder.build()); } } private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, - final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, - componentType, componentName); + componentType, componentName, allowNullValues); - addField(builder, "componentId", status.getId()); - addField(builder, "activeThreadCount", status.getActiveThreadCount()); - addField(builder, "bytesReceived", status.getBytesReceived()); - addField(builder, "bytesSent", status.getBytesSent()); - addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); - addField(builder, "flowFilesSent", status.getFlowFilesSent()); - addField(builder, "inputBytes", status.getInputBytes()); - addField(builder, "inputCount", status.getInputCount()); - addField(builder, "outputBytes", status.getOutputBytes()); - addField(builder, "outputCount", status.getOutputCount()); - addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name()); - addField(builder, "transmitting", status.isTransmitting()); + addField(builder, "componentId", status.getId(), allowNullValues); + addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues); + addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues); + addField(builder, "bytesSent", status.getBytesSent(), allowNullValues); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues); + addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues); + addField(builder, "inputBytes", status.getInputBytes(), allowNullValues); + addField(builder, "inputCount", status.getInputCount(), allowNullValues); + addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues); + addField(builder, "outputCount", status.getOutputCount(), allowNullValues); + addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name(), allowNullValues); + addField(builder, "transmitting", status.isTransmitting(), allowNullValues); arrayBuilder.add(builder.build()); } } private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Connection"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, - componentType, componentName); + componentType, componentName, allowNullValues); - addField(builder, "componentId", status.getId()); - addField(builder, "sourceId", status.getSourceId()); - addField(builder, "sourceName", status.getSourceName()); - addField(builder, "destinationId", status.getDestinationId()); - addField(builder, "destinationName", status.getDestinationName()); - addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes()); - addField(builder, "maxQueuedCount", status.getMaxQueuedCount()); - addField(builder, "queuedBytes", status.getQueuedBytes()); - addField(builder, "queuedCount", status.getQueuedCount()); - addField(builder, "inputBytes", status.getInputBytes()); - addField(builder, "inputCount", status.getInputCount()); - addField(builder, "outputBytes", status.getOutputBytes()); - addField(builder, "outputCount", status.getOutputCount()); - addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold()); - addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold()); - addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold()); + addField(builder, "componentId", status.getId(), allowNullValues); + addField(builder, "sourceId", status.getSourceId(), allowNullValues); + addField(builder, "sourceName", status.getSourceName(), allowNullValues); + addField(builder, "destinationId", status.getDestinationId(), allowNullValues); + addField(builder, "destinationName", status.getDestinationName(), allowNullValues); + addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes(), allowNullValues); + addField(builder, "maxQueuedCount", status.getMaxQueuedCount(), allowNullValues); + addField(builder, "queuedBytes", status.getQueuedBytes(), allowNullValues); + addField(builder, "queuedCount", status.getQueuedCount(), allowNullValues); + addField(builder, "inputBytes", status.getInputBytes(), allowNullValues); + addField(builder, "inputCount", status.getInputCount(), allowNullValues); + addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues); + addField(builder, "outputCount", status.getOutputCount(), allowNullValues); + addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold(), allowNullValues); + addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold(), allowNullValues); + addField(builder, "backPressureDataSizeThreshold", status.getBackPressureDataSizeThreshold(), allowNullValues); addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount()) - || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes()))); + || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())), allowNullValues); arrayBuilder.add(builder.build()); } } private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Processor"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName); + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName, allowNullValues); - addField(builder, "componentId", status.getId()); - addField(builder, "processorType", status.getType()); - addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration()); - addField(builder, "bytesRead", status.getBytesRead()); - addField(builder, "bytesWritten", status.getBytesWritten()); - addField(builder, "bytesReceived", status.getBytesReceived()); - addField(builder, "bytesSent", status.getBytesSent()); - addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved()); - addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); - addField(builder, "flowFilesSent", status.getFlowFilesSent()); - addField(builder, "inputCount", status.getInputCount()); - addField(builder, "inputBytes", status.getInputBytes()); - addField(builder, "outputCount", status.getOutputCount()); - addField(builder, "outputBytes", status.getOutputBytes()); - addField(builder, "activeThreadCount", status.getActiveThreadCount()); - addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount()); - addField(builder, "invocations", status.getInvocations()); - addField(builder, "processingNanos", status.getProcessingNanos()); - addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name()); - addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name()); - addField(builder, factory, "counters", status.getCounters()); + addField(builder, "componentId", status.getId(), allowNullValues); + addField(builder, "processorType", status.getType(), allowNullValues); + addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration(), allowNullValues); + addField(builder, "bytesRead", status.getBytesRead(), allowNullValues); + addField(builder, "bytesWritten", status.getBytesWritten(), allowNullValues); + addField(builder, "bytesReceived", status.getBytesReceived(), allowNullValues); + addField(builder, "bytesSent", status.getBytesSent(), allowNullValues); + addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved(), allowNullValues); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived(), allowNullValues); + addField(builder, "flowFilesSent", status.getFlowFilesSent(), allowNullValues); + addField(builder, "inputCount", status.getInputCount(), allowNullValues); + addField(builder, "inputBytes", status.getInputBytes(), allowNullValues); + addField(builder, "outputCount", status.getOutputCount(), allowNullValues); + addField(builder, "outputBytes", status.getOutputBytes(), allowNullValues); + addField(builder, "activeThreadCount", status.getActiveThreadCount(), allowNullValues); + addField(builder, "terminatedThreadCount", status.getTerminatedThreadCount(), allowNullValues); + addField(builder, "invocations", status.getInvocations(), allowNullValues); + addField(builder, "processingNanos", status.getProcessingNanos(), allowNullValues); + addField(builder, "runStatus", status.getRunStatus() == null ? null : status.getRunStatus().name(), allowNullValues); + addField(builder, "executionNode", status.getExecutionNode() == null ? null : status.getExecutionNode().name(), allowNullValues); + addField(builder, factory, "counters", status.getCounters(), allowNullValues); arrayBuilder.add(builder.build()); } @@ -410,40 +415,40 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, - final String componentType, final String componentName) { - addField(builder, "statusId", UUID.randomUUID().toString()); - addField(builder, "timestampMillis", currentDate.getTime()); - addField(builder, "timestamp", df.format(currentDate)); - addField(builder, "actorHostname", hostname); - addField(builder, "componentType", componentType); - addField(builder, "componentName", componentName); - addField(builder, "parentId", parentId); - addField(builder, "platform", platform); - addField(builder, "application", applicationName); + final String componentType, final String componentName, Boolean allowNullValues) { + addField(builder, "statusId", UUID.randomUUID().toString(), allowNullValues); + addField(builder, "timestampMillis", currentDate.getTime(), allowNullValues); + addField(builder, "timestamp", df.format(currentDate), allowNullValues); + addField(builder, "actorHostname", hostname, allowNullValues); + addField(builder, "componentType", componentType, allowNullValues); + addField(builder, "componentName", componentName, allowNullValues); + addField(builder, "parentId", parentId, allowNullValues); + addField(builder, "platform", platform, allowNullValues); + addField(builder, "application", applicationName, allowNullValues); } - private void addField(final JsonObjectBuilder builder, final String key, final Boolean value) { - if (value == null) { - return; - } - builder.add(key, value); - } + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values, final Boolean allowNullValues) { - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values) { - if (values == null) { - return; - } + if (values != null) { - final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); - for (final Map.Entry entry : values.entrySet()) { - if (entry.getKey() == null || entry.getValue() == null) { - continue; + final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); + for (final Map.Entry entry : values.entrySet()) { + + if (entry.getKey() == null ) { + continue; + }else if(entry.getValue() == null ){ + if(allowNullValues) + mapBuilder.add(entry.getKey(),JsonValue.NULL); + }else{ + mapBuilder.add(entry.getKey(), entry.getValue()); + } } - mapBuilder.add(entry.getKey(), entry.getValue()); - } + builder.add(key, mapBuilder); - builder.add(key, mapBuilder); + }else if(allowNullValues){ + builder.add(key,JsonValue.NULL); + } } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index f74ad4a7dd..34a086671c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -17,6 +17,25 @@ package org.apache.nifi.reporting; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; + import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -33,22 +52,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class TestSiteToSiteBulletinReportingTask { @Test @@ -120,6 +123,54 @@ public class TestSiteToSiteBulletinReportingTask { JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0); assertEquals("message", bulletinJson.getString("bulletinMessage")); assertEquals("group-name", bulletinJson.getString("bulletinGroupName")); + assertNull( bulletinJson.get("bulletinSourceType")); + } + + @Test + public void testSerializedFormWithNullValues() throws IOException, InitializationException { + // creating the list of bulletins + final List bulletins = new ArrayList(); + bulletins.add(BulletinFactory.createBulletin("group-id", "group-name", "source-id", "source-name", "category", "severity", "message")); + + // mock the access to the list of bulletins + final ReportingContext context = Mockito.mock(ReportingContext.class); + final BulletinRepository repository = Mockito.mock(BulletinRepository.class); + Mockito.when(context.getBulletinRepository()).thenReturn(repository); + Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); + + // creating reporting task + final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); + + // settings properties and mocking access to properties + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + // setup the mock initialization context + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + task.initialize(initContext); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject bulletinJson = jsonReader.readArray().getJsonObject(0); + assertEquals(JsonValue.NULL, bulletinJson.get("bulletinSourceType")); } private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java index 8cebf91dae..dbfc5f757c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java @@ -74,7 +74,7 @@ public class TestSiteToSiteMetricsReportingTask { status.setBytesSent(20000); status.setQueuedCount(100); status.setQueuedContentSize(1024L); - status.setBytesRead(60000L); + status.setBytesRead(null); status.setBytesWritten(80000L); status.setActiveThreadCount(5); @@ -241,6 +241,34 @@ public class TestSiteToSiteMetricsReportingTask { fail(); } + @Test + public void testAmbariFormatWithNullValues() throws IOException, InitializationException { + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue()); + properties.put(SiteToSiteMetricsReportingTask.ALLOW_NULL_VALUES, "true"); + + MockSiteToSiteMetricsReportingTask task = initTask(properties); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonArray array = jsonReader.readObject().getJsonArray("metrics"); + for(int i = 0; i < array.size(); i++) { + JsonObject object = array.getJsonObject(i); + assertEquals("nifi", object.getString("appid")); + assertEquals("1234", object.getString("instanceid")); + if(object.getString("metricname").equals("BytesReadLast5Minutes")) { + for(Entry kv : object.getJsonObject("metrics").entrySet()) { + assertEquals("\"null\"", kv.getValue().toString()); + } + return; + } + } + fail(); + } + @Test public void testRecordFormat() throws IOException, InitializationException { final Map properties = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index 6c16b23ed1..b5805a6095 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -18,8 +18,26 @@ package org.apache.nifi.reporting; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; + import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.Scope; @@ -46,22 +64,8 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; public class TestSiteToSiteProvenanceReportingTask { @@ -210,10 +214,38 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(3, task.dataSent.size()); final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); - JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes"); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonValue details = object.get("details"); + JsonObject msgArray = object.getJsonObject("updatedAttributes"); + assertNull(details); assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc")); + assertNull(msgArray.get("emptyVal")); } + @Test + public void testSerializedFormWithNullValues() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + + ProvenanceEventRecord event = createProvenanceEventRecord(); + + MockSiteToSiteProvenanceReportingTask task = setup(event, properties); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonValue details = object.get("details"); + JsonValue emptyVal = object.getJsonObject("updatedAttributes").get("emptyVal"); + assertEquals(JsonValue.NULL,details); + assertEquals(JsonValue.NULL,emptyVal); + } @Test public void testFilterComponentIdSuccess() throws IOException, InitializationException { final Map properties = new HashMap<>(); @@ -618,6 +650,7 @@ public class TestSiteToSiteProvenanceReportingTask { attributes.put("abc", "xyz"); attributes.put("xyz", "abc"); attributes.put("filename", "file-" + uuid); + attributes.put("emptyVal",null); final Map prevAttrs = new HashMap<>(); attributes.put("filename", "1234.xyz"); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 46be7763b8..44ac8eb89d 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -21,6 +21,7 @@ package org.apache.nifi.reporting; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -37,6 +38,7 @@ import javax.json.JsonNumber; import javax.json.JsonObject; import javax.json.JsonReader; import javax.json.JsonString; +import javax.json.JsonValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -165,8 +167,30 @@ public class TestSiteToSiteStatusReportingTask { JsonNumber bytesThreshold = object.getJsonNumber("backPressureBytesThreshold"); assertEquals("1 KB", dataSizeThreshold.getString()); assertEquals(1024, bytesThreshold.intValue()); + assertNull(object.get("destinationName")); } + @Test + public void testConnectionStatusWithNullValues() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonValue destination = object.get("destinationName"); + assertEquals(destination, JsonValue.NULL); + + } + @Test public void testComponentNameFilter() throws IOException, InitializationException { final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); @@ -207,12 +231,13 @@ public class TestSiteToSiteStatusReportingTask { @Test public void testPortStatus() throws IOException, InitializationException { - final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); final Map properties = new HashMap<>(); properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"false"); MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); task.onTrigger(context); @@ -226,6 +251,26 @@ public class TestSiteToSiteStatusReportingTask { assertFalse(isTransmitting); JsonNumber inputBytes = object.getJsonNumber("inputBytes"); assertEquals(5, inputBytes.intValue()); + assertNull(object.get("activeThreadCount")); + } + + @Test + public void testPortStatusWithNullValues() throws IOException, InitializationException { + ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(InputPort)"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonValue activeThreadCount = object.get("activeThreadCount"); + assertEquals(activeThreadCount, JsonValue.NULL); } @Test @@ -250,6 +295,27 @@ public class TestSiteToSiteStatusReportingTask { assertEquals("Transmitting", transmissionStatus.getString()); } + @Test + public void testRemoteProcessGroupStatusWithNullValues() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(RemoteProcessGroup)"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(3, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject firstElement = jsonReader.readArray().getJsonObject(0); + JsonValue targetURI = firstElement.get("targetURI"); + assertEquals(targetURI, JsonValue.NULL); + } + @Test public void testProcessorStatus() throws IOException, InitializationException { final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); @@ -273,6 +339,27 @@ public class TestSiteToSiteStatusReportingTask { assertNotNull(counterMap); assertEquals(10, counterMap.getInt("counter1")); assertEquals(5, counterMap.getInt("counter2")); + assertNull(object.get("processorType")); + } + + @Test + public void testProcessorStatusWithNullValues() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Processor)"); + properties.put(SiteToSiteStatusReportingTask.ALLOW_NULL_VALUES,"true"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonValue type = object.get("processorType"); + assertEquals(type, JsonValue.NULL); } /*********************************** @@ -319,7 +406,6 @@ public class TestSiteToSiteStatusReportingTask { pgStatus.setRemoteProcessGroupStatus(rpgStatus); pgStatus.setProcessorStatus(pStatus); pgStatus.setVersionedFlowState(VersionedFlowState.UP_TO_DATE); - pgStatus.setActiveThreadCount(1); pgStatus.setBytesRead(2L); pgStatus.setBytesReceived(3l); @@ -345,7 +431,7 @@ public class TestSiteToSiteStatusReportingTask { PortStatus pStatus = new PortStatus(); pStatus.setId(id); pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); - pStatus.setActiveThreadCount(0); + pStatus.setActiveThreadCount(null); pStatus.setBytesReceived(1l); pStatus.setBytesSent(2l); pStatus.setFlowFilesReceived(3); @@ -379,7 +465,7 @@ public class TestSiteToSiteStatusReportingTask { pStatus.setOutputBytes(12l); pStatus.setOutputCount(13); pStatus.setProcessingNanos(14l); - pStatus.setType("type"); + pStatus.setType(null); pStatus.setTerminatedThreadCount(1); pStatus.setRunStatus(RunStatus.Running); pStatus.setCounters(new HashMap() {{ @@ -402,7 +488,7 @@ public class TestSiteToSiteStatusReportingTask { rpgStatus.setReceivedCount(5); rpgStatus.setSentContentSize(6l); rpgStatus.setSentCount(7); - rpgStatus.setTargetUri("uri"); + rpgStatus.setTargetUri(null); rpgStatus.setTransmissionStatus(TransmissionStatus.Transmitting); return rpgStatus; @@ -425,7 +511,7 @@ public class TestSiteToSiteStatusReportingTask { cStatus.setSourceId(id); cStatus.setSourceName("source"); cStatus.setDestinationId(id); - cStatus.setDestinationName("destination"); + cStatus.setDestinationName(null); return cStatus; }