From 58bcd6c5ddc1989e99b5630b89f413ef4726b4a0 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 4 Feb 2020 22:35:11 -0500 Subject: [PATCH] NIFI-7106 - Add parent name and parent path in SiteToSiteStatusReportingTask Signed-off-by: Matthew Burgess This closes #4039 --- .../SiteToSiteStatusReportingTask.java | 62 +++++++++++-------- .../additionalDetails.html | 2 + .../src/main/resources/schema-status.avsc | 2 + .../TestSiteToSiteStatusReportingTask.java | 5 ++ 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 2466827531..31009f84b8 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -94,6 +94,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa private volatile Pattern componentTypeFilter; private volatile Pattern componentNameFilter; + private volatile Map processGroupIDToPath; public SiteToSiteStatusReportingTask() throws IOException { final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-status.avsc"); @@ -122,6 +123,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).evaluateAttributeExpressions().getValue()); componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).evaluateAttributeExpressions().getValue()); + // initialize the map + processGroupIDToPath = new HashMap(); + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); @@ -145,8 +149,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa df.setTimeZone(TimeZone.getTimeZone("Z")); final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); - serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName, - platform, null, new Date(), allowNullValues); + serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, + hostname, rootGroupName, platform, null, new Date(), allowNullValues); final JsonArray jsonArray = arrayBuilder.build(); @@ -230,22 +234,26 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa * The root process group name * @param platform * The configured platform - * @param parentId - * The parent's component id + * @param parent + * The parent's process group status object * @param currentDate * The current date * @param allowNullValues * Allow null values */ private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, - final ProcessGroupStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, Boolean allowNullValues) { + final ProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, + final String platform, final ProcessGroupStatus parent, final Date currentDate, Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); - final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup"; + final String componentType = parent == null ? "RootProcessGroup" : "ProcessGroup"; final String componentName = status.getName(); + if(parent == null) { + processGroupIDToPath.put(status.getId(), "NiFi Flow"); + } + if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues); addField(builder, "componentId", status.getId(), allowNullValues); @@ -271,40 +279,43 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + + processGroupIDToPath.put(childGroupStatus.getId(), processGroupIDToPath.get(status.getId()) + " / " + childGroupStatus.getName()); + serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname, - applicationName, platform, status.getId(), currentDate, allowNullValues); + applicationName, platform, status, currentDate, allowNullValues); } for(ProcessorStatus processorStatus : status.getProcessorStatus()) { serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname, - applicationName, platform, status.getId(), currentDate, allowNullValues); + applicationName, platform, status, currentDate, allowNullValues); } for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname, - applicationName, platform, status.getId(), currentDate, allowNullValues); + applicationName, platform, status, currentDate, allowNullValues); } for(PortStatus portStatus : status.getInputPortStatus()) { serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df, - hostname, applicationName, platform, status.getId(), currentDate, allowNullValues); + hostname, applicationName, platform, status, currentDate, allowNullValues); } for(PortStatus portStatus : status.getOutputPortStatus()) { serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df, - hostname, applicationName, platform, status.getId(), currentDate, allowNullValues); + hostname, applicationName, platform, status, currentDate, allowNullValues); } for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname, - applicationName, platform, status.getId(), currentDate, allowNullValues); + applicationName, platform, status, currentDate, allowNullValues); } } private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, - final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { + final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "RemoteProcessGroup"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues); addField(builder, "componentId", status.getId(), allowNullValues); @@ -324,12 +335,12 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, - final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { + final DateFormat df, final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues); addField(builder, "componentId", status.getId(), allowNullValues); @@ -350,13 +361,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { + final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Connection"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues); addField(builder, "componentId", status.getId(), allowNullValues); @@ -383,13 +394,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, - final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) { + final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Processor"; final String componentName = status.getName(); if (componentMatchesFilters(componentType, componentName)) { - addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName, allowNullValues); + addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues); addField(builder, "componentId", status.getId(), allowNullValues); addField(builder, "processorType", status.getType(), allowNullValues); @@ -418,7 +429,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, - final String applicationName, final String platform, final String parentId, final Date currentDate, + final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final String componentType, final String componentName, Boolean allowNullValues) { addField(builder, "statusId", UUID.randomUUID().toString(), allowNullValues); addField(builder, "timestampMillis", currentDate.getTime(), allowNullValues); @@ -426,12 +437,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "actorHostname", hostname, allowNullValues); addField(builder, "componentType", componentType, allowNullValues); addField(builder, "componentName", componentName, allowNullValues); - addField(builder, "parentId", parentId, allowNullValues); + addField(builder, "parentId", parent == null ? null : parent.getId(), allowNullValues); + addField(builder, "parentName", parent == null ? null : parent.getName(), allowNullValues); + addField(builder, "parentPath", parent == null ? null : processGroupIDToPath.get(parent.getId()), allowNullValues); addField(builder, "platform", platform, allowNullValues); addField(builder, "application", applicationName, allowNullValues); } - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values, final Boolean allowNullValues) { if (values != null) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html index dad56563f5..f6dca5568a 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html @@ -51,6 +51,8 @@ { "name" : "componentType", "type" : "string"}, { "name" : "componentName", "type" : "string"}, { "name" : "parentId", "type" : ["string", "null"]}, + { "name" : "parentName", "type" : ["string", "null"]}, + { "name" : "parentPath", "type" : ["string", "null"]}, { "name" : "platform", "type" : "string"}, { "name" : "application", "type" : "string"}, { "name" : "componentId", "type" : "string"}, diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc index 1932c7e9ba..70e4af1d76 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc @@ -12,6 +12,8 @@ { "name" : "componentType", "type" : "string"}, { "name" : "componentName", "type" : "string"}, { "name" : "parentId", "type" : ["string", "null"]}, + { "name" : "parentName", "type" : ["string", "null"]}, + { "name" : "parentPath", "type" : ["string", "null"]}, { "name" : "platform", "type" : "string"}, { "name" : "application", "type" : "string"}, { "name" : "componentId", "type" : "string"}, diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 9914bf1386..3dfab492f6 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; @@ -333,6 +334,10 @@ public class TestSiteToSiteStatusReportingTask { final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); JsonObject object = jsonReader.readArray().getJsonObject(0); + JsonString parentName = object.getJsonString("parentName"); + assertTrue(parentName.getString().startsWith("Awesome.1-")); + JsonString parentPath = object.getJsonString("parentPath"); + assertTrue(parentPath.getString().startsWith("NiFi Flow / Awesome.1")); JsonString runStatus = object.getJsonString("runStatus"); assertEquals(RunStatus.Running.name(), runStatus.getString()); JsonNumber inputBytes = object.getJsonNumber("inputBytes");