NIFI-7106 - Add parent name and parent path in SiteToSiteStatusReportingTask

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4039
This commit is contained in:
Pierre Villard 2020-02-04 22:35:11 -05:00 committed by Matthew Burgess
parent f738e19a75
commit 58bcd6c5dd
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 46 additions and 25 deletions

View File

@ -94,6 +94,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
private volatile Pattern componentTypeFilter;
private volatile Pattern componentNameFilter;
private volatile Map<String,String> processGroupIDToPath;
public SiteToSiteStatusReportingTask() throws IOException {
final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-status.avsc");
@ -122,6 +123,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).evaluateAttributeExpressions().getValue());
componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).evaluateAttributeExpressions().getValue());
// initialize the map
processGroupIDToPath = new HashMap<String,String>();
final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
@ -145,8 +149,8 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
df.setTimeZone(TimeZone.getTimeZone("Z"));
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName,
platform, null, new Date(), allowNullValues);
serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df,
hostname, rootGroupName, platform, null, new Date(), allowNullValues);
final JsonArray jsonArray = arrayBuilder.build();
@ -230,22 +234,26 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
* The root process group name
* @param platform
* The configured platform
* @param parentId
* The parent's component id
* @param parent
* The parent's process group status object
* @param currentDate
* The current date
* @param allowNullValues
* Allow null values
*/
private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final ProcessGroupStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, Boolean allowNullValues) {
final ProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
final String platform, final ProcessGroupStatus parent, final Date currentDate, Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup";
final String componentType = parent == null ? "RootProcessGroup" : "ProcessGroup";
final String componentName = status.getName();
if(parent == null) {
processGroupIDToPath.put(status.getId(), "NiFi Flow");
}
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@ -271,40 +279,43 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
}
for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
processGroupIDToPath.put(childGroupStatus.getId(), processGroupIDToPath.get(status.getId()) + " / " + childGroupStatus.getName());
serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate, allowNullValues);
applicationName, platform, status, currentDate, allowNullValues);
}
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname,
applicationName, platform, status.getId(), currentDate, allowNullValues);
applicationName, platform, status, currentDate, allowNullValues);
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname,
applicationName, platform, status.getId(), currentDate, allowNullValues);
applicationName, platform, status, currentDate, allowNullValues);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate, allowNullValues);
hostname, applicationName, platform, status, currentDate, allowNullValues);
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df,
hostname, applicationName, platform, status.getId(), currentDate, allowNullValues);
hostname, applicationName, platform, status, currentDate, allowNullValues);
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname,
applicationName, platform, status.getId(), currentDate, allowNullValues);
applicationName, platform, status, currentDate, allowNullValues);
}
}
private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "RemoteProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@ -324,12 +335,12 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
}
private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final DateFormat df, final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@ -350,13 +361,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
}
private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Connection";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate,
addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@ -383,13 +394,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
}
private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final Boolean allowNullValues) {
final String hostname, final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Processor";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName, allowNullValues);
addCommonFields(builder, df, hostname, applicationName, platform, parent, currentDate, componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "processorType", status.getType(), allowNullValues);
@ -418,7 +429,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
}
private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
final String applicationName, final String platform, final String parentId, final Date currentDate,
final String applicationName, final String platform, final ProcessGroupStatus parent, final Date currentDate,
final String componentType, final String componentName, Boolean allowNullValues) {
addField(builder, "statusId", UUID.randomUUID().toString(), allowNullValues);
addField(builder, "timestampMillis", currentDate.getTime(), allowNullValues);
@ -426,12 +437,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
addField(builder, "actorHostname", hostname, allowNullValues);
addField(builder, "componentType", componentType, allowNullValues);
addField(builder, "componentName", componentName, allowNullValues);
addField(builder, "parentId", parentId, allowNullValues);
addField(builder, "parentId", parent == null ? null : parent.getId(), allowNullValues);
addField(builder, "parentName", parent == null ? null : parent.getName(), allowNullValues);
addField(builder, "parentPath", parent == null ? null : processGroupIDToPath.get(parent.getId()), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "application", applicationName, allowNullValues);
}
private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, Long> values, final Boolean allowNullValues) {
if (values != null) {

View File

@ -51,6 +51,8 @@
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
{ "name" : "parentName", "type" : ["string", "null"]},
{ "name" : "parentPath", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},

View File

@ -12,6 +12,8 @@
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
{ "name" : "parentName", "type" : ["string", "null"]},
{ "name" : "parentPath", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
@ -333,6 +334,10 @@ public class TestSiteToSiteStatusReportingTask {
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
JsonString parentName = object.getJsonString("parentName");
assertTrue(parentName.getString().startsWith("Awesome.1-"));
JsonString parentPath = object.getJsonString("parentPath");
assertTrue(parentPath.getString().startsWith("NiFi Flow / Awesome.1"));
JsonString runStatus = object.getJsonString("runStatus");
assertEquals(RunStatus.Running.name(), runStatus.getString());
JsonNumber inputBytes = object.getJsonNumber("inputBytes");