From 1d21e3baf85ec622fd9d0fb9d3f18c27802a0978 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 27 Apr 2018 17:41:39 +0200 Subject: [PATCH] NIFI-5122 - Add Record Writer for S2S RTs This closes #2663 Signed-off-by: Mike Thomsen --- .../nifi-site-to-site-reporting-task/pom.xml | 3 + .../AbstractSiteToSiteReportingTask.java | 21 ++- .../SiteToSiteBulletinReportingTask.java | 13 +- .../SiteToSiteMetricsReportingTask.java | 1 - .../SiteToSiteProvenanceReportingTask.java | 12 +- .../SiteToSiteStatusReportingTask.java | 14 +- .../additionalDetails.html | 65 ++++++++++ .../additionalDetails.html | 9 +- .../additionalDetails.html | 122 ++++++++++++++++++ .../src/main/resources/schema-bulletins.avsc | 21 +++ .../src/main/resources/schema-provenance.avsc | 35 +++++ .../src/main/resources/schema-status.avsc | 78 +++++++++++ .../TestSiteToSiteBulletinReportingTask.java | 4 + .../TestSiteToSiteMetricsReportingTask.java | 3 - ...TestSiteToSiteProvenanceReportingTask.java | 4 + .../TestSiteToSiteStatusReportingTask.java | 6 +- 16 files changed, 393 insertions(+), 18 deletions(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index 93a3196ea8..d60893ebaf 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -120,6 +120,9 @@ src/main/resources/schema-metrics.avsc + src/main/resources/schema-bulletins.avsc + src/main/resources/schema-provenance.avsc + src/main/resources/schema-status.avsc diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index e7553547cc..21bb3974c2 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.reporting; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +31,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import javax.json.JsonArray; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; import javax.net.ssl.SSLContext; @@ -46,6 +49,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; @@ -208,6 +212,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT properties.add(HTTP_PROXY_PORT); properties.add(HTTP_PROXY_USERNAME); properties.add(HTTP_PROXY_PASSWORD); + properties.add(RECORD_WRITER); return properties; } @@ -264,6 +269,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT return this.siteToSiteClient; } + protected void sendData(final ReportingContext context, final Transaction transaction, Map attributes, final JsonArray jsonArray) throws IOException { + if(context.getProperty(RECORD_WRITER).isSet()) { + transaction.send(getData(context, new ByteArrayInputStream(jsonArray.toString().getBytes(StandardCharsets.UTF_8)), attributes), attributes); + } else { + transaction.send(jsonArray.toString().getBytes(StandardCharsets.UTF_8), attributes); + } + } + protected byte[] getData(final ReportingContext context, InputStream in, Map attributes) { try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) { @@ -387,8 +400,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT if (firstObjectConsumed && !array) { return null; } + + JsonNode nextNode = getNextJsonNode(); + if(nextNode == null) { + return null; + } + try { - return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields); + return convertJsonNodeToRecord(nextNode, getSchema(), null, coerceTypes, dropUnknownFields); } catch (final MalformedRecordException mre) { throw mre; } catch (final IOException ioe) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index d026aa1ecf..ac60d8a092 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -17,11 +17,13 @@ package org.apache.nifi.reporting; +import org.apache.avro.Schema; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -37,8 +39,9 @@ import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; + import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -76,6 +79,11 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting private volatile long lastSentBulletinId = -1L; + public SiteToSiteBulletinReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-bulletins.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); @@ -153,8 +161,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("mime.type", "application/json"); - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + sendData(context, transaction, attributes, jsonArray); transaction.confirm(); transaction.complete(); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 20416e1fd3..e17c2c8fe6 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -113,7 +113,6 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT properties.add(HOSTNAME); properties.add(APPLICATION_ID); properties.add(FORMAT); - properties.add(RECORD_WRITER); properties.remove(BATCH_SIZE); return properties; } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index ec1414d343..ec45c59665 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -17,6 +17,7 @@ package org.apache.nifi.reporting; +import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; @@ -25,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; @@ -47,9 +49,9 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -160,6 +162,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti private volatile ProvenanceEventConsumer consumer; + public SiteToSiteProvenanceReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-provenance.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @OnScheduled public void onScheduled(final ConfigurationContext context) throws IOException { consumer = new ProvenanceEventConsumer(); @@ -287,8 +294,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("mime.type", "application/json"); - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + sendData(context, transaction, attributes, jsonArray); transaction.confirm(); transaction.complete(); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index c419d5c587..618c40ca91 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -18,9 +18,9 @@ package org.apache.nifi.reporting; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -41,8 +41,10 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; +import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; @@ -92,6 +94,11 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa private volatile Pattern componentTypeFilter; private volatile Pattern componentNameFilter; + public SiteToSiteStatusReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-status.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); @@ -168,10 +175,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa for(JsonValue jsonValue : jsonBatch) { jsonBatchArrayBuilder.add(jsonValue); } - final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build(); - final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build(); + sendData(context, transaction, attributes, jsonBatchArray); transaction.confirm(); transaction.complete(); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html new file mode 100644 index 0000000000..c76c138fb2 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html @@ -0,0 +1,65 @@ + + + + + + SiteToSiteBulletinReportingTask + + + + + +

+ The Site-to-Site Bulletin Reporting Task allows the user to publish Bulletin events using the Site To Site protocol. Note: + only up to 5 bulletins are stored per component and up to 10 bulletins at controller level for a duration of up to 5 minutes. + If this reporting task is not scheduled frequently enough some bulletins may not be sent. +

+ +

Record writer

+ +

+ The user can define a Record Writer and directly specify the output format and data with the assumption that the input schema + is the following: +

+ +
+			
+{
+  "type" : "record",
+  "name" : "bulletins",
+  "namespace" : "bulletins",
+  "fields" : [ 
+	{ "name" : "objectId", "type" : "string" },
+	{ "name" : "platform", "type" : "string" },
+	{ "name" : "bulletinId", "type" : "long" },
+	{ "name" : "bulletinCategory", "type" : ["string", "null"] },
+	{ "name" : "bulletinGroupId", "type" : ["string", "null"] },
+	{ "name" : "bulletinGroupName", "type" : ["string", "null"] },
+	{ "name" : "bulletinLevel", "type" : ["string", "null"] },
+	{ "name" : "bulletinMessage", "type" : ["string", "null"] },
+	{ "name" : "bulletinNodeAddress", "type" : ["string", "null"] },
+	{ "name" : "bulletinNodeId", "type" : ["string", "null"] },
+	{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
+	{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
+	{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
+	{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
+  ]
+}
+			
+		
+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html index 86736a6616..676674eb85 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html @@ -33,8 +33,8 @@

- When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using - a schema. As such, the schema for this Provenance data can be defined as follows: + By default, when published to a NiFi instance, the Provenance data is sent as a JSON array. However, the user can define a Record Writer + and directly specify the output format and data with the assumption that the input schema is defined as follows:

@@ -53,6 +53,9 @@
     { "name": "details", "type": "string" },
     { "name": "componentId", "type": "string" },
     { "name": "componentType", "type": "string" },
+    { "name": "componentName", "type": "string" },
+    { "name": "processGroupId", "type": "string" },
+    { "name": "processGroupName", "type": "string" },
     { "name": "entityId", "type": "string" },
     { "name": "entityType", "type": "string" },
     { "name": "entitySize", "type": ["null", "long"] },
@@ -66,6 +69,8 @@
     { "name": "childIds", "type": { "type": "array", "items": "string" } },
     { "name": "platform", "type": "string" },
     { "name": "application", "type": "string" },
+    { "name": "remoteIdentifier", "type": "string" },
+    { "name": "alternateIdentifier", "type": "string" },
     { "name": "transitUri", "type": ["null", "string"] }
   ]
 }
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
new file mode 100644
index 0000000000..2d0be38717
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
@@ -0,0 +1,122 @@
+
+
+    
+    
+        
+        SiteToSiteStatusReportingTask
+
+        
+    
+
+    
+    	

+ The Site-to-Site Status Reporting Task allows the user to publish Status events using the Site To Site protocol. + The component type and name filter regexes form a union: only components matching both regexes will be reported. + However, all process groups are recursively searched for matching components, regardless of whether the process + group matches the component filters. +

+ +

Record writer

+ +

+ The user can define a Record Writer and directly specify the output format and data with the assumption that the + input schema is the following: +

+ +
+			
+{
+  "type" : "record",
+  "name" : "status",
+  "namespace" : "status",
+  "fields" : [
+    // common fields for all components
+	{ "name" : "statusId", "type" : "string"},
+	{ "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } },
+	{ "name" : "timestamp", "type" : "string"},
+	{ "name" : "actorHostname", "type" : "string"},
+	{ "name" : "componentType", "type" : "string"},
+	{ "name" : "componentName", "type" : "string"},
+	{ "name" : "parentId", "type" : ["string", "null"]},
+	{ "name" : "platform", "type" : "string"},
+	{ "name" : "application", "type" : "string"},
+	{ "name" : "componentId", "type" : "string"},
+	
+	// PG + RPG + Ports + Processors
+	{ "name" : "activeThreadCount", "type" : ["long", "null"]},
+	
+	// PG + Ports + Processors
+	{ "name" : "flowFilesReceived", "type" : ["long", "null"]},
+	{ "name" : "flowFilesSent", "type" : ["long", "null"]},
+	
+	// PG + Ports + Processors
+	{ "name" : "bytesReceived", "type" : ["long", "null"]},
+	{ "name" : "bytesSent", "type" : ["long", "null"]},
+	
+	// PG + Connections
+	{ "name" : "queuedCount", "type" : ["long", "null"]},
+	
+	// PG + Processors
+	{ "name" : "bytesRead", "type" : ["long", "null"]},
+	{ "name" : "bytesWritten", "type" : ["long", "null"]},
+	
+	// fields for process group status
+	{ "name" : "bytesTransferred", "type" : ["long", "null"]},
+	{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
+	{ "name" : "inputContentSize", "type" : ["long", "null"]},
+	{ "name" : "outputContentSize", "type" : ["long", "null"]},
+	{ "name" : "queuedContentSize", "type" : ["long", "null"]},
+	
+	// fields for remote process groups
+	{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
+	{ "name" : "inactiveRemotePortCount", "type" : ["long", "null"]},
+	{ "name" : "receivedContentSize", "type" : ["long", "null"]},
+	{ "name" : "receivedCount", "type" : ["long", "null"]},
+	{ "name" : "sentContentSize", "type" : ["long", "null"]},
+	{ "name" : "sentCount", "type" : ["long", "null"]},
+	{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
+	
+	// fields for input/output ports + connections + PG
+	{ "name" : "inputBytes", "type" : ["long", "null"]},
+	{ "name" : "inputCount", "type" : ["long", "null"]},
+	{ "name" : "outputBytes", "type" : ["long", "null"]},
+	{ "name" : "outputCount", "type" : ["long", "null"]},
+	
+	// fields for connections
+	{ "name" : "sourceId", "type" : ["string", "null"]},
+	{ "name" : "sourceName", "type" : ["string", "null"]},
+	{ "name" : "destinationId", "type" : ["string", "null"]},
+	{ "name" : "destinationName", "type" : ["string", "null"]},
+	{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
+	{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
+	{ "name" : "queuedBytes", "type" : ["long", "null"]},
+	{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
+	{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
+	{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
+	
+    // fields for processors
+	{ "name" : "processorType", "type" : ["string", "null"]},
+	{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
+	{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
+	{ "name" : "invocations", "type" : ["long", "null"]},
+	{ "name" : "processingNanos", "type" : ["long", "null"]}
+  ]
+}
+			
+		
+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc new file mode 100644 index 0000000000..01b0f33ab2 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc @@ -0,0 +1,21 @@ +{ + "type" : "record", + "name" : "bulletins", + "namespace" : "bulletins", + "fields" : [ + { "name" : "objectId", "type" : "string" }, + { "name" : "platform", "type" : "string" }, + { "name" : "bulletinId", "type" : "long" }, + { "name" : "bulletinCategory", "type" : ["string", "null"] }, + { "name" : "bulletinGroupId", "type" : ["string", "null"] }, + { "name" : "bulletinGroupName", "type" : ["string", "null"] }, + { "name" : "bulletinLevel", "type" : ["string", "null"] }, + { "name" : "bulletinMessage", "type" : ["string", "null"] }, + { "name" : "bulletinNodeAddress", "type" : ["string", "null"] }, + { "name" : "bulletinNodeId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceName", "type" : ["string", "null"] }, + { "name" : "bulletinSourceType", "type" : ["string", "null"] }, + { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" } + ] +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc new file mode 100644 index 0000000000..840bde607d --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc @@ -0,0 +1,35 @@ +{ + "namespace": "nifi", + "name": "provenanceEvent", + "type": "record", + "fields": [ + { "name": "eventId", "type": "string" }, + { "name": "eventOrdinal", "type": "long" }, + { "name": "eventType", "type": "string" }, + { "name": "timestampMillis", "type": "long" }, + { "name": "durationMillis", "type": "long" }, + { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } }, + { "name": "details", "type": "string" }, + { "name": "componentId", "type": "string" }, + { "name": "componentType", "type": "string" }, + { "name": "componentName", "type": "string" }, + { "name": "processGroupId", "type": "string" }, + { "name": "processGroupName", "type": "string" }, + { "name": "entityId", "type": "string" }, + { "name": "entityType", "type": "string" }, + { "name": "entitySize", "type": ["null", "long"] }, + { "name": "previousEntitySize", "type": ["null", "long"] }, + { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } }, + { "name": "previousAttributes", "type": { "type": "map", "values": "string" } }, + { "name": "actorHostname", "type": "string" }, + { "name": "contentURI", "type": "string" }, + { "name": "previousContentURI", "type": "string" }, + { "name": "parentIds", "type": { "type": "array", "items": "string" } }, + { "name": "childIds", "type": { "type": "array", "items": "string" } }, + { "name": "platform", "type": "string" }, + { "name": "application", "type": "string" }, + { "name": "remoteIdentifier", "type": "string" }, + { "name": "alternateIdentifier", "type": "string" }, + { "name": "transitUri", "type": ["null", "string"] } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc new file mode 100644 index 0000000000..6f16d0e91f --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc @@ -0,0 +1,78 @@ +{ + "type" : "record", + "name" : "status", + "namespace" : "status", + "fields" : [ + + // common fields for all components + { "name" : "statusId", "type" : "string"}, + { "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } }, + { "name" : "timestamp", "type" : "string"}, + { "name" : "actorHostname", "type" : "string"}, + { "name" : "componentType", "type" : "string"}, + { "name" : "componentName", "type" : "string"}, + { "name" : "parentId", "type" : ["string", "null"]}, + { "name" : "platform", "type" : "string"}, + { "name" : "application", "type" : "string"}, + { "name" : "componentId", "type" : "string"}, + + // PG + RPG + Ports + Processors + { "name" : "activeThreadCount", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "flowFilesReceived", "type" : ["long", "null"]}, + { "name" : "flowFilesSent", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "bytesReceived", "type" : ["long", "null"]}, + { "name" : "bytesSent", "type" : ["long", "null"]}, + + // PG + Connections + { "name" : "queuedCount", "type" : ["long", "null"]}, + + // PG + Processors + { "name" : "bytesRead", "type" : ["long", "null"]}, + { "name" : "bytesWritten", "type" : ["long", "null"]}, + + // fields for process group status + { "name" : "bytesTransferred", "type" : ["long", "null"]}, + { "name" : "flowFilesTransferred", "type" : ["long", "null"]}, + { "name" : "inputContentSize", "type" : ["long", "null"]}, + { "name" : "outputContentSize", "type" : ["long", "null"]}, + { "name" : "queuedContentSize", "type" : ["long", "null"]}, + + // fields for remote process groups + { "name" : "activeRemotePortCount", "type" : ["long", "null"]}, + { "name" : "inactiveRemotePortCount", "type" : ["long", "null"]}, + { "name" : "receivedContentSize", "type" : ["long", "null"]}, + { "name" : "receivedCount", "type" : ["long", "null"]}, + { "name" : "sentContentSize", "type" : ["long", "null"]}, + { "name" : "sentCount", "type" : ["long", "null"]}, + { "name" : "averageLineageDuration", "type" : ["long", "null"]}, + + // fields for input/output ports + connections + PG + { "name" : "inputBytes", "type" : ["long", "null"]}, + { "name" : "inputCount", "type" : ["long", "null"]}, + { "name" : "outputBytes", "type" : ["long", "null"]}, + { "name" : "outputCount", "type" : ["long", "null"]}, + + // fields for connections + { "name" : "sourceId", "type" : ["string", "null"]}, + { "name" : "sourceName", "type" : ["string", "null"]}, + { "name" : "destinationId", "type" : ["string", "null"]}, + { "name" : "destinationName", "type" : ["string", "null"]}, + { "name" : "maxQueuedBytes", "type" : ["long", "null"]}, + { "name" : "maxQueuedCount", "type" : ["long", "null"]}, + { "name" : "queuedBytes", "type" : ["long", "null"]}, + { "name" : "backPressureBytesThreshold", "type" : ["long", "null"]}, + { "name" : "backPressureObjectThreshold", "type" : ["long", "null"]}, + { "name" : "isBackPressureEnabled", "type" : ["string", "null"]}, + + // fields for processors + { "name" : "processorType", "type" : ["string", "null"]}, + { "name" : "averageLineageDurationMS", "type" : ["long", "null"]}, + { "name" : "flowFilesRemoved", "type" : ["long", "null"]}, + { "name" : "invocations", "type" : ["long", "null"]}, + { "name" : "processingNanos", "type" : ["long", "null"]} + ] +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index 6d70442a0b..4f6bd5f525 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -125,6 +125,10 @@ public class TestSiteToSiteBulletinReportingTask { private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { + public MockSiteToSiteBulletinReportingTask() throws IOException { + super(); + } + final List dataSent = new ArrayList<>(); @Override diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java index c699a1c883..e4f24cb6f6 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java @@ -17,7 +17,6 @@ package org.apache.nifi.reporting; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -53,7 +52,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; -import org.apache.nifi.util.TestRunner; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -65,7 +63,6 @@ public class TestSiteToSiteMetricsReportingTask { private ReportingContext context; private ProcessGroupStatus status; - private TestRunner runner; @Before public void setup() { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index 31054c2471..d39df59c9f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -604,6 +604,10 @@ public class TestSiteToSiteProvenanceReportingTask { private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask { + public MockSiteToSiteProvenanceReportingTask() throws IOException { + super(); + } + final List dataSent = new ArrayList<>(); @Override diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 79ba213284..6fe795f537 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -60,7 +60,7 @@ public class TestSiteToSiteStatusReportingTask { private ReportingContext context; public MockSiteToSiteStatusReportingTask initTask(Map customProperties, - ProcessGroupStatus pgStatus) throws InitializationException { + ProcessGroupStatus pgStatus) throws InitializationException, IOException { final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask(); Map properties = new HashMap<>(); for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { @@ -338,6 +338,10 @@ public class TestSiteToSiteStatusReportingTask { private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask { + public MockSiteToSiteStatusReportingTask() throws IOException { + super(); + } + final List dataSent = new ArrayList<>(); @Override