NIFI-5122 - Add Record Writer for S2S RTs

This closes #2663

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Pierre Villard 2018-04-27 17:41:39 +02:00 committed by Mike Thomsen
parent 1663a6c094
commit 1d21e3baf8
16 changed files with 393 additions and 18 deletions

View File

@ -120,6 +120,9 @@
<configuration> <configuration>
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>src/main/resources/schema-metrics.avsc</exclude> <exclude>src/main/resources/schema-metrics.avsc</exclude>
<exclude>src/main/resources/schema-bulletins.avsc</exclude>
<exclude>src/main/resources/schema-provenance.avsc</exclude>
<exclude>src/main/resources/schema-status.avsc</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -16,9 +16,11 @@
*/ */
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -29,6 +31,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.json.JsonArray;
import javax.json.JsonObjectBuilder; import javax.json.JsonObjectBuilder;
import javax.json.JsonValue; import javax.json.JsonValue;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -46,6 +49,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.protocol.http.HttpProxy;
@ -208,6 +212,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
properties.add(HTTP_PROXY_PORT); properties.add(HTTP_PROXY_PORT);
properties.add(HTTP_PROXY_USERNAME); properties.add(HTTP_PROXY_USERNAME);
properties.add(HTTP_PROXY_PASSWORD); properties.add(HTTP_PROXY_PASSWORD);
properties.add(RECORD_WRITER);
return properties; return properties;
} }
@ -264,6 +269,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
return this.siteToSiteClient; return this.siteToSiteClient;
} }
protected void sendData(final ReportingContext context, final Transaction transaction, Map<String, String> attributes, final JsonArray jsonArray) throws IOException {
if(context.getProperty(RECORD_WRITER).isSet()) {
transaction.send(getData(context, new ByteArrayInputStream(jsonArray.toString().getBytes(StandardCharsets.UTF_8)), attributes), attributes);
} else {
transaction.send(jsonArray.toString().getBytes(StandardCharsets.UTF_8), attributes);
}
}
protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) { protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) { try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {
@ -387,8 +400,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
if (firstObjectConsumed && !array) { if (firstObjectConsumed && !array) {
return null; return null;
} }
JsonNode nextNode = getNextJsonNode();
if(nextNode == null) {
return null;
}
try { try {
return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields); return convertJsonNodeToRecord(nextNode, getSchema(), null, coerceTypes, dropUnknownFields);
} catch (final MalformedRecordException mre) { } catch (final MalformedRecordException mre) {
throw mre; throw mre;
} catch (final IOException ioe) { } catch (final IOException ioe) {

View File

@ -17,11 +17,13 @@
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
@ -37,8 +39,9 @@ import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory; import javax.json.JsonBuilderFactory;
import javax.json.JsonObject; import javax.json.JsonObject;
import javax.json.JsonObjectBuilder; import javax.json.JsonObjectBuilder;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.io.InputStream;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
@ -76,6 +79,11 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
private volatile long lastSentBulletinId = -1L; private volatile long lastSentBulletinId = -1L;
public SiteToSiteBulletinReportingTask() throws IOException {
final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-bulletins.avsc");
recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -153,8 +161,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("reporting.task.type", this.getClass().getSimpleName());
attributes.put("mime.type", "application/json"); attributes.put("mime.type", "application/json");
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); sendData(context, transaction, attributes, jsonArray);
transaction.send(data, attributes);
transaction.confirm(); transaction.confirm();
transaction.complete(); transaction.complete();

View File

@ -113,7 +113,6 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT
properties.add(HOSTNAME); properties.add(HOSTNAME);
properties.add(APPLICATION_ID); properties.add(APPLICATION_ID);
properties.add(FORMAT); properties.add(FORMAT);
properties.add(RECORD_WRITER);
properties.remove(BATCH_SIZE); properties.remove(BATCH_SIZE);
return properties; return properties;
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.Restriction;
@ -25,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.RequiredPermission;
@ -47,9 +49,9 @@ import javax.json.JsonBuilderFactory;
import javax.json.JsonObject; import javax.json.JsonObject;
import javax.json.JsonObjectBuilder; import javax.json.JsonObjectBuilder;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
@ -160,6 +162,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
private volatile ProvenanceEventConsumer consumer; private volatile ProvenanceEventConsumer consumer;
public SiteToSiteProvenanceReportingTask() throws IOException {
final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-provenance.avsc");
recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
}
@OnScheduled @OnScheduled
public void onScheduled(final ConfigurationContext context) throws IOException { public void onScheduled(final ConfigurationContext context) throws IOException {
consumer = new ProvenanceEventConsumer(); consumer = new ProvenanceEventConsumer();
@ -287,8 +294,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("reporting.task.type", this.getClass().getSimpleName());
attributes.put("mime.type", "application/json"); attributes.put("mime.type", "application/json");
final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); sendData(context, transaction, attributes, jsonArray);
transaction.send(data, attributes);
transaction.confirm(); transaction.confirm();
transaction.complete(); transaction.complete();

View File

@ -18,9 +18,9 @@
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
@ -41,8 +41,10 @@ import javax.json.JsonBuilderFactory;
import javax.json.JsonObjectBuilder; import javax.json.JsonObjectBuilder;
import javax.json.JsonValue; import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.PortStatus;
@ -92,6 +94,11 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
private volatile Pattern componentTypeFilter; private volatile Pattern componentTypeFilter;
private volatile Pattern componentNameFilter; private volatile Pattern componentNameFilter;
public SiteToSiteStatusReportingTask() throws IOException {
final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-status.avsc");
recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -168,10 +175,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
for(JsonValue jsonValue : jsonBatch) { for(JsonValue jsonValue : jsonBatch) {
jsonBatchArrayBuilder.add(jsonValue); jsonBatchArrayBuilder.add(jsonValue);
} }
final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build();
final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8); final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build();
transaction.send(data, attributes); sendData(context, transaction, attributes, jsonBatchArray);
transaction.confirm(); transaction.confirm();
transaction.complete(); transaction.complete();

View File

@ -0,0 +1,65 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>SiteToSiteBulletinReportingTask</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>
The Site-to-Site Bulletin Reporting Task allows the user to publish Bulletin events using the Site To Site protocol. Note:
only up to 5 bulletins are stored per component and up to 10 bulletins at controller level for a duration of up to 5 minutes.
If this reporting task is not scheduled frequently enough some bulletins may not be sent.
</p>
<h2>Record writer</h2>
<p>
The user can define a Record Writer and directly specify the output format and data with the assumption that the input schema
is the following:
</p>
<pre>
<code>
{
"type" : "record",
"name" : "bulletins",
"namespace" : "bulletins",
"fields" : [
{ "name" : "objectId", "type" : "string" },
{ "name" : "platform", "type" : "string" },
{ "name" : "bulletinId", "type" : "long" },
{ "name" : "bulletinCategory", "type" : ["string", "null"] },
{ "name" : "bulletinGroupId", "type" : ["string", "null"] },
{ "name" : "bulletinGroupName", "type" : ["string", "null"] },
{ "name" : "bulletinLevel", "type" : ["string", "null"] },
{ "name" : "bulletinMessage", "type" : ["string", "null"] },
{ "name" : "bulletinNodeAddress", "type" : ["string", "null"] },
{ "name" : "bulletinNodeId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
]
}
</code>
</pre>
</body>
</html>

View File

@ -33,8 +33,8 @@
</p> </p>
<p> <p>
When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using By default, when published to a NiFi instance, the Provenance data is sent as a JSON array. However, the user can define a Record Writer
a schema. As such, the schema for this Provenance data can be defined as follows: and directly specify the output format and data with the assumption that the input schema is defined as follows:
</p> </p>
<pre> <pre>
@ -53,6 +53,9 @@
{ "name": "details", "type": "string" }, { "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" }, { "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" }, { "name": "componentType", "type": "string" },
{ "name": "componentName", "type": "string" },
{ "name": "processGroupId", "type": "string" },
{ "name": "processGroupName", "type": "string" },
{ "name": "entityId", "type": "string" }, { "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" }, { "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] }, { "name": "entitySize", "type": ["null", "long"] },
@ -66,6 +69,8 @@
{ "name": "childIds", "type": { "type": "array", "items": "string" } }, { "name": "childIds", "type": { "type": "array", "items": "string" } },
{ "name": "platform", "type": "string" }, { "name": "platform", "type": "string" },
{ "name": "application", "type": "string" }, { "name": "application", "type": "string" },
{ "name": "remoteIdentifier", "type": "string" },
{ "name": "alternateIdentifier", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] } { "name": "transitUri", "type": ["null", "string"] }
] ]
} }

View File

@ -0,0 +1,122 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>SiteToSiteStatusReportingTask</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>
The Site-to-Site Status Reporting Task allows the user to publish Status events using the Site To Site protocol.
The component type and name filter regexes form a union: only components matching both regexes will be reported.
However, all process groups are recursively searched for matching components, regardless of whether the process
group matches the component filters.
</p>
<h2>Record writer</h2>
<p>
The user can define a Record Writer and directly specify the output format and data with the assumption that the
input schema is the following:
</p>
<pre>
<code>
{
"type" : "record",
"name" : "status",
"namespace" : "status",
"fields" : [
// common fields for all components
{ "name" : "statusId", "type" : "string"},
{ "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name" : "timestamp", "type" : "string"},
{ "name" : "actorHostname", "type" : "string"},
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},
// PG + RPG + Ports + Processors
{ "name" : "activeThreadCount", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "flowFilesReceived", "type" : ["long", "null"]},
{ "name" : "flowFilesSent", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "bytesReceived", "type" : ["long", "null"]},
{ "name" : "bytesSent", "type" : ["long", "null"]},
// PG + Connections
{ "name" : "queuedCount", "type" : ["long", "null"]},
// PG + Processors
{ "name" : "bytesRead", "type" : ["long", "null"]},
{ "name" : "bytesWritten", "type" : ["long", "null"]},
// fields for process group status
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
{ "name" : "inputContentSize", "type" : ["long", "null"]},
{ "name" : "outputContentSize", "type" : ["long", "null"]},
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
// fields for remote process groups
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
{ "name" : "inactiveRemotePortCount", "type" : ["long", "null"]},
{ "name" : "receivedContentSize", "type" : ["long", "null"]},
{ "name" : "receivedCount", "type" : ["long", "null"]},
{ "name" : "sentContentSize", "type" : ["long", "null"]},
{ "name" : "sentCount", "type" : ["long", "null"]},
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
// fields for input/output ports + connections + PG
{ "name" : "inputBytes", "type" : ["long", "null"]},
{ "name" : "inputCount", "type" : ["long", "null"]},
{ "name" : "outputBytes", "type" : ["long", "null"]},
{ "name" : "outputCount", "type" : ["long", "null"]},
// fields for connections
{ "name" : "sourceId", "type" : ["string", "null"]},
{ "name" : "sourceName", "type" : ["string", "null"]},
{ "name" : "destinationId", "type" : ["string", "null"]},
{ "name" : "destinationName", "type" : ["string", "null"]},
{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
{ "name" : "queuedBytes", "type" : ["long", "null"]},
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
// fields for processors
{ "name" : "processorType", "type" : ["string", "null"]},
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
{ "name" : "invocations", "type" : ["long", "null"]},
{ "name" : "processingNanos", "type" : ["long", "null"]}
]
}
</code>
</pre>
</body>
</html>

View File

@ -0,0 +1,21 @@
{
"type" : "record",
"name" : "bulletins",
"namespace" : "bulletins",
"fields" : [
{ "name" : "objectId", "type" : "string" },
{ "name" : "platform", "type" : "string" },
{ "name" : "bulletinId", "type" : "long" },
{ "name" : "bulletinCategory", "type" : ["string", "null"] },
{ "name" : "bulletinGroupId", "type" : ["string", "null"] },
{ "name" : "bulletinGroupName", "type" : ["string", "null"] },
{ "name" : "bulletinLevel", "type" : ["string", "null"] },
{ "name" : "bulletinMessage", "type" : ["string", "null"] },
{ "name" : "bulletinNodeAddress", "type" : ["string", "null"] },
{ "name" : "bulletinNodeId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
]
}

View File

@ -0,0 +1,35 @@
{
"namespace": "nifi",
"name": "provenanceEvent",
"type": "record",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": "long" },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name": "details", "type": "string" },
{ "name": "componentId", "type": "string" },
{ "name": "componentType", "type": "string" },
{ "name": "componentName", "type": "string" },
{ "name": "processGroupId", "type": "string" },
{ "name": "processGroupName", "type": "string" },
{ "name": "entityId", "type": "string" },
{ "name": "entityType", "type": "string" },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "actorHostname", "type": "string" },
{ "name": "contentURI", "type": "string" },
{ "name": "previousContentURI", "type": "string" },
{ "name": "parentIds", "type": { "type": "array", "items": "string" } },
{ "name": "childIds", "type": { "type": "array", "items": "string" } },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "remoteIdentifier", "type": "string" },
{ "name": "alternateIdentifier", "type": "string" },
{ "name": "transitUri", "type": ["null", "string"] }
]
}

View File

@ -0,0 +1,78 @@
{
"type" : "record",
"name" : "status",
"namespace" : "status",
"fields" : [
// common fields for all components
{ "name" : "statusId", "type" : "string"},
{ "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name" : "timestamp", "type" : "string"},
{ "name" : "actorHostname", "type" : "string"},
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},
// PG + RPG + Ports + Processors
{ "name" : "activeThreadCount", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "flowFilesReceived", "type" : ["long", "null"]},
{ "name" : "flowFilesSent", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "bytesReceived", "type" : ["long", "null"]},
{ "name" : "bytesSent", "type" : ["long", "null"]},
// PG + Connections
{ "name" : "queuedCount", "type" : ["long", "null"]},
// PG + Processors
{ "name" : "bytesRead", "type" : ["long", "null"]},
{ "name" : "bytesWritten", "type" : ["long", "null"]},
// fields for process group status
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
{ "name" : "inputContentSize", "type" : ["long", "null"]},
{ "name" : "outputContentSize", "type" : ["long", "null"]},
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
// fields for remote process groups
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
{ "name" : "inactiveRemotePortCount", "type" : ["long", "null"]},
{ "name" : "receivedContentSize", "type" : ["long", "null"]},
{ "name" : "receivedCount", "type" : ["long", "null"]},
{ "name" : "sentContentSize", "type" : ["long", "null"]},
{ "name" : "sentCount", "type" : ["long", "null"]},
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
// fields for input/output ports + connections + PG
{ "name" : "inputBytes", "type" : ["long", "null"]},
{ "name" : "inputCount", "type" : ["long", "null"]},
{ "name" : "outputBytes", "type" : ["long", "null"]},
{ "name" : "outputCount", "type" : ["long", "null"]},
// fields for connections
{ "name" : "sourceId", "type" : ["string", "null"]},
{ "name" : "sourceName", "type" : ["string", "null"]},
{ "name" : "destinationId", "type" : ["string", "null"]},
{ "name" : "destinationName", "type" : ["string", "null"]},
{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
{ "name" : "queuedBytes", "type" : ["long", "null"]},
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
// fields for processors
{ "name" : "processorType", "type" : ["string", "null"]},
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
{ "name" : "invocations", "type" : ["long", "null"]},
{ "name" : "processingNanos", "type" : ["long", "null"]}
]
}

View File

@ -125,6 +125,10 @@ public class TestSiteToSiteBulletinReportingTask {
private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask {
public MockSiteToSiteBulletinReportingTask() throws IOException {
super();
}
final List<byte[]> dataSent = new ArrayList<>(); final List<byte[]> dataSent = new ArrayList<>();
@Override @Override

View File

@ -17,7 +17,6 @@
package org.apache.nifi.reporting; package org.apache.nifi.reporting;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -53,7 +52,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.TestRunner;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -65,7 +63,6 @@ public class TestSiteToSiteMetricsReportingTask {
private ReportingContext context; private ReportingContext context;
private ProcessGroupStatus status; private ProcessGroupStatus status;
private TestRunner runner;
@Before @Before
public void setup() { public void setup() {

View File

@ -604,6 +604,10 @@ public class TestSiteToSiteProvenanceReportingTask {
private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask { private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask {
public MockSiteToSiteProvenanceReportingTask() throws IOException {
super();
}
final List<byte[]> dataSent = new ArrayList<>(); final List<byte[]> dataSent = new ArrayList<>();
@Override @Override

View File

@ -60,7 +60,7 @@ public class TestSiteToSiteStatusReportingTask {
private ReportingContext context; private ReportingContext context;
public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties, public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties,
ProcessGroupStatus pgStatus) throws InitializationException { ProcessGroupStatus pgStatus) throws InitializationException, IOException {
final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask(); final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask();
Map<PropertyDescriptor, String> properties = new HashMap<>(); Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
@ -338,6 +338,10 @@ public class TestSiteToSiteStatusReportingTask {
private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask { private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask {
public MockSiteToSiteStatusReportingTask() throws IOException {
super();
}
final List<byte[]> dataSent = new ArrayList<>(); final List<byte[]> dataSent = new ArrayList<>();
@Override @Override