diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java index 342b5a27b0..43372e1cd1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java @@ -24,13 +24,12 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import java.util.HashMap; import java.util.Map; -import java.util.Stack; public class ComponentMapHolder { private static final String REMOTE_INPUT_PORT = "Remote Input Port"; private static final String REMOTE_OUTPUT_PORT = "Remote Output Port"; private final Map componentNameMap = new HashMap<>(); - private final Map componentToParentGroupMap = new HashMap<>(); + private final Map componentToParentGroupMap = new HashMap<>(); private final Map sourceToConnectionParentGroupMap = new HashMap<>(); private final Map destinationToConnectionParentGroupMap = new HashMap<>(); @@ -46,21 +45,6 @@ public class ComponentMapHolder { return componentNameMap.get(componentId); } - public Stack getProcessGroupIdStack(final String startingProcessGroupId) { - final Stack stack = new Stack<>(); - String processGroupId = startingProcessGroupId; - stack.push(startingProcessGroupId); - while (componentToParentGroupMap.containsKey(processGroupId)) { - final String parentGroupId = componentToParentGroupMap.get(processGroupId); - if (parentGroupId == null || parentGroupId.isEmpty()) { - break; - } - stack.push(parentGroupId); - processGroupId = parentGroupId; - } - return stack; - } - public String getProcessGroupId(final String componentId, final String componentType) { // Where a Remote Input/Output Port resides is only available at ConnectionStatus. if (REMOTE_INPUT_PORT.equals(componentType)) { @@ -68,42 +52,53 @@ public class ComponentMapHolder { } else if (REMOTE_OUTPUT_PORT.equals(componentType)) { return sourceToConnectionParentGroupMap.get(componentId); } + ParentProcessGroupSearchNode parentNode = componentToParentGroupMap.get(componentId); + return parentNode == null ? null : parentNode.getId(); + } + + public ParentProcessGroupSearchNode getProcessGroupParent(final String componentId) { return componentToParentGroupMap.get(componentId); } - public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) { + public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) { final ComponentMapHolder holder = new ComponentMapHolder(); final Map componentNameMap = holder.componentNameMap; - final Map componentToParentGroupMap = holder.componentToParentGroupMap; + final Map componentToParentGroupMap = holder.componentToParentGroupMap; final Map sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap; final Map destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap; if (status != null) { + ParentProcessGroupSearchNode parentNode = thisNode; componentNameMap.put(status.getId(), status.getName()); + // Put a root entry in if one does not yet exist + if (parentNode == null) { + parentNode = new ParentProcessGroupSearchNode(status.getId(), null); + componentToParentGroupMap.put(status.getId(), parentNode); + } for (final ProcessorStatus procStatus : status.getProcessorStatus()) { componentNameMap.put(procStatus.getId(), procStatus.getName()); - componentToParentGroupMap.put(procStatus.getId(), status.getId()); + componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); } for (final PortStatus portStatus : status.getInputPortStatus()) { componentNameMap.put(portStatus.getId(), portStatus.getName()); - componentToParentGroupMap.put(portStatus.getId(), status.getId()); + componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); } for (final PortStatus portStatus : status.getOutputPortStatus()) { componentNameMap.put(portStatus.getId(), portStatus.getName()); - componentToParentGroupMap.put(portStatus.getId(), status.getId()); + componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); } for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { componentNameMap.put(rpgStatus.getId(), rpgStatus.getName()); - componentToParentGroupMap.put(rpgStatus.getId(), status.getId()); + componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); } for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) { componentNameMap.put(connectionStatus.getId(), connectionStatus.getName()); - componentToParentGroupMap.put(connectionStatus.getId(), status.getId()); + componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode)); // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus. componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName()); componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName()); @@ -113,8 +108,9 @@ public class ComponentMapHolder { for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { componentNameMap.put(childGroup.getId(), childGroup.getName()); - componentToParentGroupMap.put(childGroup.getId(), status.getId()); - holder.putAll(createComponentMap(childGroup)); + ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode); + componentToParentGroupMap.put(childGroup.getId(), node); + holder.putAll(createComponentMap(childGroup, node)); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java new file mode 100644 index 0000000000..2cf1f48980 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.provenance; + + +public class ParentProcessGroupSearchNode { + + private final String id; + private final ParentProcessGroupSearchNode parent; + + public ParentProcessGroupSearchNode(String id, ParentProcessGroupSearchNode parent) { + this.id = id; + this.parent = parent; + } + + public ParentProcessGroupSearchNode getParent() { + return parent; + } + + public String getId() { + return id; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 75c1e6037a..feb302ae53 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -89,13 +89,13 @@ public class ProvenanceEventConsumer { } } - public void addTargetEventType(final ProvenanceEventType ... types) { + public void addTargetEventType(final ProvenanceEventType... types) { for (ProvenanceEventType type : types) { eventTypes.add(type); } } - public void addTargetComponentId(final String ... ids) { + public void addTargetComponentId(final String... ids) { for (String id : ids) { componentIds.add(id); } @@ -122,12 +122,13 @@ public class ProvenanceEventConsumer { } final EventAccess eventAccess = context.getEventAccess(); final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus(); - final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus); + final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null); + final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode); final StateManager stateManager = context.getStateManager(); Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId(); - if(currMaxId == null) { + if (currMaxId == null) { logger.debug("No events to send because no events have been created yet."); return; } @@ -156,7 +157,7 @@ public class ProvenanceEventConsumer { firstEventId = -1; } else { logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + - "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId}); + "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[]{currMaxId, firstEventId}); firstEventId = currMaxId; } } @@ -218,7 +219,7 @@ public class ProvenanceEventConsumer { stateManager.setState(newMapOfState, Scope.LOCAL); } catch (final IOException ioe) { logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", - new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe); + new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe); } return lastEvent.getEventId() + 1; @@ -230,28 +231,34 @@ public class ProvenanceEventConsumer { } private List filterEvents(ComponentMapHolder componentMapHolder, List provenanceEvents) { - if(isFilteringEnabled()) { + if (isFilteringEnabled()) { List filteredEvents = new ArrayList<>(); for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) { - if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) { + final String componentId = provenanceEventRecord.getComponentId(); + if (!componentIds.isEmpty() && !componentIds.contains(componentId)) { // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs // that is being filtered on if (componentMapHolder == null) { continue; } - final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType()); - if (processGroupId == null || processGroupId.isEmpty()) { + final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType()); + if (StringUtils.isEmpty(processGroupId)) { continue; } - if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) { + // Check if any parent process group has the specified component ID + ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId); + while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) { + matchedComponent = matchedComponent.getParent(); + } + if (matchedComponent == null) { continue; } } - if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { + if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { continue; } - if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) { + if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) { continue; } filteredEvents.add(provenanceEventRecord); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index ec2e3013a5..201361fe70 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -22,6 +22,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.provenance.ProvenanceEventBuilder; @@ -55,6 +56,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; public class TestSiteToSiteProvenanceReportingTask { @@ -65,7 +67,7 @@ public class TestSiteToSiteProvenanceReportingTask { private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map properties) throws IOException { final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask(); - Mockito.when(context.getStateManager()) + when(context.getStateManager()) .thenReturn(new MockStateManager(task)); Mockito.doAnswer(new Answer() { @Override @@ -104,6 +106,9 @@ public class TestSiteToSiteProvenanceReportingTask { return eventsToReturn; } }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); + ProcessGroupStatus processGroupStatus = new ProcessGroupStatus(); + processGroupStatus.setId("root"); + when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus); final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); Mockito.doAnswer(new Answer() { @@ -113,12 +118,12 @@ public class TestSiteToSiteProvenanceReportingTask { } }).when(provenanceRepository).getMaxEventId(); - Mockito.when(context.getEventAccess()).thenReturn(eventAccess); - Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); + when(context.getEventAccess()).thenReturn(eventAccess); + when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); final ComponentLog logger = Mockito.mock(ComponentLog.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + when(initContext.getLogger()).thenReturn(logger); return task; } @@ -331,7 +336,7 @@ public class TestSiteToSiteProvenanceReportingTask { // setup the mock EventAccess to return the mock provenance repository final EventAccess eventAccess = Mockito.mock(EventAccess.class); - Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); + when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); task.initialize(initContext); @@ -388,7 +393,7 @@ public class TestSiteToSiteProvenanceReportingTask { } }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); - Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); } catch (final Exception e) { e.printStackTrace(); Assert.fail(e.toString());