diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index bde2f95580..5d3cfc6dd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; /** * Template utilities. @@ -145,8 +146,9 @@ public final class SnippetUtils { // we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the // Controller Services in our flow themselves!) final Set allServicesReferenced = new HashSet<>(); - final Map contentsByGroup = new HashMap<>(); - contentsByGroup.put(processGroup.getIdentifier(), snippetDto); + final Map contentsByGroup = new HashMap<>(); + final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse); + contentsByGroup.put(processGroup.getIdentifier(), highestProcessGroupDTO); // add any processors final Set controllerServices = new HashSet<>(); @@ -162,13 +164,13 @@ public final class SnippetUtils { if (includeControllerServices) { // Include all referenced services that are not already included in this snippet. getControllerServices(processor.getEffectivePropertyValues()).stream() - .filter(allServicesReferenced::add) - .forEach(svc -> { - final String svcGroupId = svc.getParentGroupId(); - final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); - svc.setParentGroupId(destinationGroupId); - controllerServices.add(svc); - }); + .filter(allServicesReferenced::add) + .forEach(svc -> { + final String svcGroupId = svc.getParentGroupId(); + final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); + svc.setParentGroupId(destinationGroupId); + controllerServices.add(svc); + }); } } } @@ -234,42 +236,33 @@ public final class SnippetUtils { } // add any process groups - final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse); - final Set processGroups = highestProcessGroupDTO.getContents().getProcessGroups(); - fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup); + final Set processGroups = new LinkedHashSet<>(); + if (!snippet.getProcessGroups().isEmpty()) { + Set snippetGroupIds = snippet.getProcessGroups().keySet(); - // Maintain a listing of visited groups starting with each group in the snippet. - // This is used to determine whether a referenced controller service should be included in the resulting snippet. - // If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet - // and will only be included when the includeControllerServices is set to true. - // This happens above when considering the processors in this snippet. - final Set visitedGroupIds = new HashSet<>(); - for (final String groupIdentifier : contentsByGroup.keySet()) { - - // Include this group in the ancestry for this snippet, services only get included if the includeControllerServices - // flag is set or if the service is defined within this groups hierarchy within the snippet. - if (!groupIdentifier.equals(processGroup.getIdentifier())) { - visitedGroupIds.add(groupIdentifier); + for (final ProcessGroupDTO group: highestProcessGroupDTO.getContents().getProcessGroups()) { + if (snippetGroupIds.contains(group.getId())) { + contentsByGroup.put(group.getId(), group); + addChildren(group, contentsByGroup); + } } - for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) { - // Include all referenced services that are not already included in this snippet. - getControllerServices(procNode.getEffectivePropertyValues()).stream() - .filter(allServicesReferenced::add) - .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId())) - .forEach(svc -> { - final String svcGroupId = svc.getParentGroupId(); - final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); - svc.setParentGroupId(destinationGroupId); - final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId); - Set services = contents.getControllerServices(); - if (services == null) { - contents.setControllerServices(Collections.singleton(svc)); - } else { - services.add(svc); - contents.setControllerServices(services); - } - }); + for (final String childGroupId : snippet.getProcessGroups().keySet()) { + final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); + if (childGroup == null) { + throw new IllegalStateException("A process group in this snippet could not be found."); + } + + ProcessGroupDTO childGroupDto = contentsByGroup.get(childGroupId); + + // maintain a listing of visited groups starting with each group in the snippet. this is used to determine + // whether a referenced controller service should be included in the resulting snippet. if the service is + // defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included + // when the includeControllerServices is set to true. this happens above when considering the processors in this snippet + final Set visitedGroupIds = new HashSet<>(); + addControllerServices(childGroup, childGroupDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier()); + + processGroups.add(childGroupDto); } } @@ -285,6 +278,7 @@ public final class SnippetUtils { } } + // Normalize the coordinates based on the locations of the other components final List components = new ArrayList<>(); components.addAll((Set) processors); @@ -297,7 +291,7 @@ public final class SnippetUtils { components.addAll((Set) remoteProcessGroups); normalizeCoordinates(components); - Set updatedControllerServices = snippetDto.getControllerServices(); + Set updatedControllerServices = contentsByGroup.get(processGroup.getIdentifier()).getContents().getControllerServices(); if (updatedControllerServices == null) { updatedControllerServices = new HashSet<>(); } @@ -316,10 +310,63 @@ public final class SnippetUtils { return snippetDto; } - private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map contentByGroupMap) { + private void addChildren(final ProcessGroupDTO processGroup, final Map contentByGroupMap) { for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) { - contentByGroupMap.put(group.getId(), group.getContents()); - fillContentsByGroupMap(group, contentByGroupMap); + contentByGroupMap.put(group.getId(), group); + addChildren(group, contentByGroupMap); + } + } + + /** + * Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and + * adds them to the given servicesByGroup map + * + * @param group the Process Group to start from + * @param contents Process Group's contents + * @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services + * @param contentsByGroup a Map of Process Group ID to the Process Group's contents + * @param highestGroupId the UUID of the 'highest' process group in the snippet + */ + private void addControllerServices(final ProcessGroup group, final FlowSnippetDTO contents, final Set allServicesReferenced, + final boolean includeControllerServices, final Set visitedGroupIds, final Map contentsByGroup, final String highestGroupId) { + + // include this group in the ancestry for this snippet, services only get included if the includeControllerServices + // flag is set or if the service is defined within this groups hierarchy within the snippet + visitedGroupIds.add(group.getIdentifier()); + + for (final ProcessorNode procNode : group.getProcessors()) { + // Include all referenced services that are not already included in this snippet. + getControllerServices(procNode.getEffectivePropertyValues()).stream() + .filter(allServicesReferenced::add) + .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId())) + .forEach(svc -> { + final String svcGroupId = svc.getParentGroupId(); + final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId; + svc.setParentGroupId(destinationGroupId); + final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId).getContents(); + if (snippetDto != null) { + Set services = snippetDto.getControllerServices(); + if (services == null) { + snippetDto.setControllerServices(Collections.singleton(svc)); + } else { + services.add(svc); + snippetDto.setControllerServices(services); + } + } + }); + } + + // Map child process group ID to the child process group for easy lookup + final Map childGroupMap = contents.getProcessGroups().stream() + .collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto)); + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier()); + if (childDto == null) { + continue; + } + + addControllerServices(childGroup, childDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId); } } @@ -345,6 +392,7 @@ public final class SnippetUtils { return serviceDtos; } + public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) { final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy); resolveNameConflicts(snippetCopy, group);