NIFI-8522 NiFi can duplicate controller services when generating templates

Adding final keywords and capitalize comment for process group section

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5086.
This commit is contained in:
Timea Barna 2021-05-19 14:46:07 +02:00 committed by Pierre Villard
parent 9fe1f56019
commit bf960cae2e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 45 additions and 81 deletions

View File

@ -75,7 +75,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Template utilities.
@ -163,13 +162,13 @@ public final class SnippetUtils {
if (includeControllerServices) {
// Include all referenced services that are not already included in this snippet.
getControllerServices(processor.getEffectivePropertyValues()).stream()
.filter(allServicesReferenced::add)
.forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
svc.setParentGroupId(destinationGroupId);
controllerServices.add(svc);
});
.filter(allServicesReferenced::add)
.forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
svc.setParentGroupId(destinationGroupId);
controllerServices.add(svc);
});
}
}
}
@ -235,23 +234,42 @@ public final class SnippetUtils {
}
// add any process groups
final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>();
if (!snippet.getProcessGroups().isEmpty()) {
for (final String childGroupId : snippet.getProcessGroups().keySet()) {
final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
if (childGroup == null) {
throw new IllegalStateException("A process group in this snippet could not be found.");
}
final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
final Set<ProcessGroupDTO> processGroups = highestProcessGroupDTO.getContents().getProcessGroups();
fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup);
final ProcessGroupDTO childGroupDto = dtoFactory.createProcessGroupDto(childGroup, recurse);
processGroups.add(childGroupDto);
// Maintain a listing of visited groups starting with each group in the snippet.
// This is used to determine whether a referenced controller service should be included in the resulting snippet.
// If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet
// and will only be included when the includeControllerServices is set to true.
// This happens above when considering the processors in this snippet.
final Set<String> visitedGroupIds = new HashSet<>();
for (final String groupIdentifier : contentsByGroup.keySet()) {
// maintain a listing of visited groups starting with each group in the snippet. this is used to determine
// whether a referenced controller service should be included in the resulting snippet. if the service is
// defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included
// when the includeControllerServices is set to true. this happens above when considering the processors in this snippet
final Set<String> visitedGroupIds = new HashSet<>();
addControllerServices(childGroup, childGroupDto, allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier());
// Include this group in the ancestry for this snippet, services only get included if the includeControllerServices
// flag is set or if the service is defined within this groups hierarchy within the snippet.
if (!groupIdentifier.equals(processGroup.getIdentifier())) {
visitedGroupIds.add(groupIdentifier);
}
for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) {
// Include all referenced services that are not already included in this snippet.
getControllerServices(procNode.getEffectivePropertyValues()).stream()
.filter(allServicesReferenced::add)
.filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
.forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
svc.setParentGroupId(destinationGroupId);
final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId);
Set<ControllerServiceDTO> services = contents.getControllerServices();
if (services == null) {
contents.setControllerServices(Collections.singleton(svc));
} else {
services.add(svc);
contents.setControllerServices(services);
}
});
}
}
@ -267,7 +285,6 @@ public final class SnippetUtils {
}
}
// Normalize the coordinates based on the locations of the other components
final List<? extends ComponentDTO> components = new ArrayList<>();
components.addAll((Set) processors);
@ -299,62 +316,10 @@ public final class SnippetUtils {
return snippetDto;
}
/**
* Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and
* adds them to the given servicesByGroup map
*
* @param group the Process Group to start from
* @param dto the DTO representation of the Process Group
* @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services
* @param contentsByGroup a Map of Process Group ID to the Process Group's contents
* @param highestGroupId the UUID of the 'highest' process group in the snippet
*/
private void addControllerServices(final ProcessGroup group, final ProcessGroupDTO dto, final Set<ControllerServiceDTO> allServicesReferenced,
final boolean includeControllerServices, final Set<String> visitedGroupIds, final Map<String, FlowSnippetDTO> contentsByGroup, final String highestGroupId) {
final FlowSnippetDTO contents = dto.getContents();
contentsByGroup.put(dto.getId(), contents);
if (contents == null) {
return;
}
// include this group in the ancestry for this snippet, services only get included if the includeControllerServices
// flag is set or if the service is defined within this groups hierarchy within the snippet
visitedGroupIds.add(group.getIdentifier());
for (final ProcessorNode procNode : group.getProcessors()) {
// Include all referenced services that are not already included in this snippet.
getControllerServices(procNode.getEffectivePropertyValues()).stream()
.filter(allServicesReferenced::add)
.filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
.forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId;
svc.setParentGroupId(destinationGroupId);
final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId);
if (snippetDto != null) {
Set<ControllerServiceDTO> services = snippetDto.getControllerServices();
if (services == null) {
snippetDto.setControllerServices(Collections.singleton(svc));
} else {
services.add(svc);
snippetDto.setControllerServices(services);
}
}
});
}
// Map child process group ID to the child process group for easy lookup
final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
.collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto));
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier());
if (childDto == null) {
continue;
}
addControllerServices(childGroup, childDto, allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId);
private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map<String, FlowSnippetDTO> contentByGroupMap) {
for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) {
contentByGroupMap.put(group.getId(), group.getContents());
fillContentsByGroupMap(group, contentByGroupMap);
}
}
@ -380,7 +345,6 @@ public final class SnippetUtils {
return serviceDtos;
}
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
resolveNameConflicts(snippetCopy, group);