diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java index 43372e1cd1..e85eca3002 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java @@ -60,7 +60,11 @@ public class ComponentMapHolder { return componentToParentGroupMap.get(componentId); } - public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) { + public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) { + return createComponentMap(status, new ParentProcessGroupSearchNode(status.getId(), null)); + } + + private static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisProcessGroupNode) { final ComponentMapHolder holder = new ComponentMapHolder(); final Map componentNameMap = holder.componentNameMap; final Map componentToParentGroupMap = holder.componentToParentGroupMap; @@ -68,37 +72,31 @@ public class ComponentMapHolder { final Map destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap; if (status != null) { - ParentProcessGroupSearchNode parentNode = thisNode; componentNameMap.put(status.getId(), status.getName()); - // Put a root entry in if one does not yet exist - if (parentNode == null) { - parentNode = new ParentProcessGroupSearchNode(status.getId(), null); - componentToParentGroupMap.put(status.getId(), parentNode); - } for (final ProcessorStatus procStatus : status.getProcessorStatus()) { componentNameMap.put(procStatus.getId(), procStatus.getName()); - componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); + componentToParentGroupMap.put(procStatus.getId(), thisProcessGroupNode); } for (final PortStatus portStatus : status.getInputPortStatus()) { componentNameMap.put(portStatus.getId(), portStatus.getName()); - componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); + componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode); } for (final PortStatus portStatus : status.getOutputPortStatus()) { componentNameMap.put(portStatus.getId(), portStatus.getName()); - componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); + componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode); } for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { componentNameMap.put(rpgStatus.getId(), rpgStatus.getName()); - componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); + componentToParentGroupMap.put(rpgStatus.getId(), thisProcessGroupNode); } for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) { componentNameMap.put(connectionStatus.getId(), connectionStatus.getName()); - componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); + componentToParentGroupMap.put(connectionStatus.getId(), thisProcessGroupNode); // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus. componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName()); componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName()); @@ -108,9 +106,9 @@ public class ComponentMapHolder { for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { componentNameMap.put(childGroup.getId(), childGroup.getName()); - ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode); - componentToParentGroupMap.put(childGroup.getId(), node); - holder.putAll(createComponentMap(childGroup, node)); + ParentProcessGroupSearchNode childProcessGroupNode = new ParentProcessGroupSearchNode(childGroup.getId(), thisProcessGroupNode); + componentToParentGroupMap.put(childGroup.getId(), thisProcessGroupNode); + holder.putAll(createComponentMap(childGroup, childProcessGroupNode)); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index feb302ae53..18a8afe393 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -122,8 +122,7 @@ public class ProvenanceEventConsumer { } final EventAccess eventAccess = context.getEventAccess(); final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus(); - final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null); - final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode); + final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus); final StateManager stateManager = context.getStateManager(); Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId(); @@ -246,13 +245,15 @@ public class ProvenanceEventConsumer { if (StringUtils.isEmpty(processGroupId)) { continue; } - // Check if any parent process group has the specified component ID - ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId); - while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) { - matchedComponent = matchedComponent.getParent(); - } - if (matchedComponent == null) { - continue; + // Check if the process group or any parent process group is specified as a target component ID. + if (!componentIds.contains(processGroupId)) { + ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId); + while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) { + parentProcessGroup = parentProcessGroup.getParent(); + } + if (parentProcessGroup == null) { + continue; + } } } if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index 201361fe70..26d5877d7e 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -18,11 +18,15 @@ package org.apache.nifi.reporting; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.provenance.ProvenanceEventBuilder; @@ -49,6 +53,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,6 +70,10 @@ public class TestSiteToSiteProvenanceReportingTask { private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class); private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map properties) throws IOException { + return setup(event, properties, 2500); + } + + private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map properties, long maxEventId) throws IOException { final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask(); when(context.getStateManager()) @@ -85,7 +94,6 @@ public class TestSiteToSiteProvenanceReportingTask { } }).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class)); - final long maxEventId = 2500; final AtomicInteger totalEvents = new AtomicInteger(0); final EventAccess eventAccess = Mockito.mock(EventAccess.class); @@ -106,9 +114,65 @@ public class TestSiteToSiteProvenanceReportingTask { return eventsToReturn; } }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); - ProcessGroupStatus processGroupStatus = new ProcessGroupStatus(); - processGroupStatus.setId("root"); - when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus); + ProcessGroupStatus pgRoot = new ProcessGroupStatus(); + pgRoot.setId("root"); + when(eventAccess.getControllerStatus()).thenReturn(pgRoot); + + // Add child Process Groups. + // Root -> (A, B -> (B2 -> (B3))) + final ProcessGroupStatus pgA = new ProcessGroupStatus(); + pgA.setId("pgA"); + final ProcessGroupStatus pgB = new ProcessGroupStatus(); + pgB.setId("pgB"); + final ProcessGroupStatus pgB2 = new ProcessGroupStatus(); + pgB2.setId("pgB2"); + final ProcessGroupStatus pgB3 = new ProcessGroupStatus(); + pgB3.setId("pgB3"); + final Collection childPGs = pgRoot.getProcessGroupStatus(); + childPGs.add(pgA); + childPGs.add(pgB); + pgB.getProcessGroupStatus().add(pgB2); + pgB2.getProcessGroupStatus().add(pgB3); + + // Add Processors. + final ProcessorStatus prcRoot = new ProcessorStatus(); + prcRoot.setId("1234"); + pgRoot.getProcessorStatus().add(prcRoot); + + final ProcessorStatus prcA = new ProcessorStatus(); + prcA.setId("A001"); + prcA.setName("Processor in PGA"); + pgA.getProcessorStatus().add(prcA); + + final ProcessorStatus prcB = new ProcessorStatus(); + prcB.setId("B001"); + prcB.setName("Processor in PGB"); + pgB.getProcessorStatus().add(prcB); + + final ProcessorStatus prcB2 = new ProcessorStatus(); + prcB2.setId("B201"); + prcB2.setName("Processor in PGB2"); + pgB2.getProcessorStatus().add(prcB2); + + final ProcessorStatus prcB3 = new ProcessorStatus(); + prcB3.setId("B301"); + prcB3.setName("Processor in PGB3"); + pgB3.getProcessorStatus().add(prcB3); + + // Add connection status to test Remote Input/Output Ports + final ConnectionStatus b2RemoteInputPort = new ConnectionStatus(); + b2RemoteInputPort.setGroupId("pgB2"); + b2RemoteInputPort.setSourceId("B201"); + b2RemoteInputPort.setDestinationId("riB2"); + b2RemoteInputPort.setDestinationName("Remote Input Port name"); + pgB2.getConnectionStatus().add(b2RemoteInputPort); + + final ConnectionStatus b3RemoteOutputPort = new ConnectionStatus(); + b3RemoteOutputPort.setGroupId("pgB3"); + b3RemoteOutputPort.setSourceId("roB3"); + b3RemoteOutputPort.setSourceName("Remote Output Port name"); + b3RemoteOutputPort.setDestinationId("B301"); + pgB3.getConnectionStatus().add(b3RemoteOutputPort); final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); Mockito.doAnswer(new Answer() { @@ -307,6 +371,90 @@ public class TestSiteToSiteProvenanceReportingTask { assertEquals(3, task.dataSent.size()); } + @Test + public void testFilterProcessGroupId() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "pgB2"); + + + // B201 belongs to ProcessGroup B2, so it should be picked. + ProvenanceEventRecord event = createProvenanceEventRecord("B201", "dummy"); + MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0); + assertEquals("B201", reportedEvent.get("componentId").asText()); + assertEquals("Processor in PGB2", reportedEvent.get("componentName").asText()); + + + // B301 belongs to PG B3, whose parent is PGB2, so it should be picked, too. + event = createProvenanceEventRecord("B301", "dummy"); + task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0); + assertEquals("B301", reportedEvent.get("componentId").asText()); + assertEquals("Processor in PGB3", reportedEvent.get("componentName").asText()); + + // A001 belongs to PG A, whose parent is the root PG, so it should be filtered out. + event = createProvenanceEventRecord("A001", "dummy"); + task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(0, task.dataSent.size()); + } + + @Test + public void testRemotePorts() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "riB2,roB3"); + + + // riB2 is a Remote Input Port in Process Group B2. + ProvenanceEventRecord event = createProvenanceEventRecord("riB2", "Remote Input Port"); + MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0); + assertEquals("riB2", reportedEvent.get("componentId").asText()); + assertEquals("Remote Input Port name", reportedEvent.get("componentName").asText()); + assertEquals("pgB2", reportedEvent.get("processGroupId").asText()); + + + // roB3 is a Remote Output Port in Process Group B3. + event = createProvenanceEventRecord("roB3", "Remote Output Port"); + task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0); + assertEquals("roB3", reportedEvent.get("componentId").asText()); + assertEquals("Remote Output Port name", reportedEvent.get("componentName").asText()); + assertEquals("pgB3", reportedEvent.get("processGroupId").asText()); + + } + @Test public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException { final long maxEventId = 2500; @@ -353,6 +501,9 @@ public class TestSiteToSiteProvenanceReportingTask { } private ProvenanceEventRecord createProvenanceEventRecord() { + return createProvenanceEventRecord("1234", "dummy processor"); + } + private ProvenanceEventRecord createProvenanceEventRecord(final String componentId, final String componentType) { final String uuid = "10000000-0000-0000-0000-000000000000"; final Map attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -369,8 +520,8 @@ public class TestSiteToSiteProvenanceReportingTask { attributes.put("uuid", uuid); builder.fromFlowFile(createFlowFile(3L, attributes)); builder.setAttributes(prevAttrs, attributes); - builder.setComponentId("1234"); - builder.setComponentType("dummy processor"); + builder.setComponentId(componentId); + builder.setComponentType(componentType); return builder.build(); }