mirror of https://github.com/apache/nifi.git
NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation
This closes #5109. Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
parent
96d521159b
commit
f23dcb05f6
|
@ -75,6 +75,7 @@ import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Template utilities.
|
* Template utilities.
|
||||||
|
@ -145,8 +146,9 @@ public final class SnippetUtils {
|
||||||
// we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the
|
// we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the
|
||||||
// Controller Services in our flow themselves!)
|
// Controller Services in our flow themselves!)
|
||||||
final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>();
|
final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>();
|
||||||
final Map<String, FlowSnippetDTO> contentsByGroup = new HashMap<>();
|
final Map<String, ProcessGroupDTO> contentsByGroup = new HashMap<>();
|
||||||
contentsByGroup.put(processGroup.getIdentifier(), snippetDto);
|
final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
|
||||||
|
contentsByGroup.put(processGroup.getIdentifier(), highestProcessGroupDTO);
|
||||||
|
|
||||||
// add any processors
|
// add any processors
|
||||||
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
|
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
|
||||||
|
@ -234,42 +236,33 @@ public final class SnippetUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add any process groups
|
// add any process groups
|
||||||
final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
|
final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>();
|
||||||
final Set<ProcessGroupDTO> processGroups = highestProcessGroupDTO.getContents().getProcessGroups();
|
if (!snippet.getProcessGroups().isEmpty()) {
|
||||||
fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup);
|
Set<String> snippetGroupIds = snippet.getProcessGroups().keySet();
|
||||||
|
|
||||||
// Maintain a listing of visited groups starting with each group in the snippet.
|
for (final ProcessGroupDTO group: highestProcessGroupDTO.getContents().getProcessGroups()) {
|
||||||
// This is used to determine whether a referenced controller service should be included in the resulting snippet.
|
if (snippetGroupIds.contains(group.getId())) {
|
||||||
// If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet
|
contentsByGroup.put(group.getId(), group);
|
||||||
// and will only be included when the includeControllerServices is set to true.
|
addChildren(group, contentsByGroup);
|
||||||
// This happens above when considering the processors in this snippet.
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final String childGroupId : snippet.getProcessGroups().keySet()) {
|
||||||
|
final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
|
||||||
|
if (childGroup == null) {
|
||||||
|
throw new IllegalStateException("A process group in this snippet could not be found.");
|
||||||
|
}
|
||||||
|
|
||||||
|
ProcessGroupDTO childGroupDto = contentsByGroup.get(childGroupId);
|
||||||
|
|
||||||
|
// maintain a listing of visited groups starting with each group in the snippet. this is used to determine
|
||||||
|
// whether a referenced controller service should be included in the resulting snippet. if the service is
|
||||||
|
// defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included
|
||||||
|
// when the includeControllerServices is set to true. this happens above when considering the processors in this snippet
|
||||||
final Set<String> visitedGroupIds = new HashSet<>();
|
final Set<String> visitedGroupIds = new HashSet<>();
|
||||||
for (final String groupIdentifier : contentsByGroup.keySet()) {
|
addControllerServices(childGroup, childGroupDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier());
|
||||||
|
|
||||||
// Include this group in the ancestry for this snippet, services only get included if the includeControllerServices
|
processGroups.add(childGroupDto);
|
||||||
// flag is set or if the service is defined within this groups hierarchy within the snippet.
|
|
||||||
if (!groupIdentifier.equals(processGroup.getIdentifier())) {
|
|
||||||
visitedGroupIds.add(groupIdentifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) {
|
|
||||||
// Include all referenced services that are not already included in this snippet.
|
|
||||||
getControllerServices(procNode.getEffectivePropertyValues()).stream()
|
|
||||||
.filter(allServicesReferenced::add)
|
|
||||||
.filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
|
|
||||||
.forEach(svc -> {
|
|
||||||
final String svcGroupId = svc.getParentGroupId();
|
|
||||||
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
|
|
||||||
svc.setParentGroupId(destinationGroupId);
|
|
||||||
final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId);
|
|
||||||
Set<ControllerServiceDTO> services = contents.getControllerServices();
|
|
||||||
if (services == null) {
|
|
||||||
contents.setControllerServices(Collections.singleton(svc));
|
|
||||||
} else {
|
|
||||||
services.add(svc);
|
|
||||||
contents.setControllerServices(services);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +278,7 @@ public final class SnippetUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Normalize the coordinates based on the locations of the other components
|
// Normalize the coordinates based on the locations of the other components
|
||||||
final List<? extends ComponentDTO> components = new ArrayList<>();
|
final List<? extends ComponentDTO> components = new ArrayList<>();
|
||||||
components.addAll((Set) processors);
|
components.addAll((Set) processors);
|
||||||
|
@ -297,7 +291,7 @@ public final class SnippetUtils {
|
||||||
components.addAll((Set) remoteProcessGroups);
|
components.addAll((Set) remoteProcessGroups);
|
||||||
normalizeCoordinates(components);
|
normalizeCoordinates(components);
|
||||||
|
|
||||||
Set<ControllerServiceDTO> updatedControllerServices = snippetDto.getControllerServices();
|
Set<ControllerServiceDTO> updatedControllerServices = contentsByGroup.get(processGroup.getIdentifier()).getContents().getControllerServices();
|
||||||
if (updatedControllerServices == null) {
|
if (updatedControllerServices == null) {
|
||||||
updatedControllerServices = new HashSet<>();
|
updatedControllerServices = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
@ -316,10 +310,63 @@ public final class SnippetUtils {
|
||||||
return snippetDto;
|
return snippetDto;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map<String, FlowSnippetDTO> contentByGroupMap) {
|
private void addChildren(final ProcessGroupDTO processGroup, final Map<String, ProcessGroupDTO> contentByGroupMap) {
|
||||||
for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) {
|
for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) {
|
||||||
contentByGroupMap.put(group.getId(), group.getContents());
|
contentByGroupMap.put(group.getId(), group);
|
||||||
fillContentsByGroupMap(group, contentByGroupMap);
|
addChildren(group, contentByGroupMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and
|
||||||
|
* adds them to the given servicesByGroup map
|
||||||
|
*
|
||||||
|
* @param group the Process Group to start from
|
||||||
|
* @param contents Process Group's contents
|
||||||
|
* @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services
|
||||||
|
* @param contentsByGroup a Map of Process Group ID to the Process Group's contents
|
||||||
|
* @param highestGroupId the UUID of the 'highest' process group in the snippet
|
||||||
|
*/
|
||||||
|
private void addControllerServices(final ProcessGroup group, final FlowSnippetDTO contents, final Set<ControllerServiceDTO> allServicesReferenced,
|
||||||
|
final boolean includeControllerServices, final Set<String> visitedGroupIds, final Map<String, ProcessGroupDTO> contentsByGroup, final String highestGroupId) {
|
||||||
|
|
||||||
|
// include this group in the ancestry for this snippet, services only get included if the includeControllerServices
|
||||||
|
// flag is set or if the service is defined within this groups hierarchy within the snippet
|
||||||
|
visitedGroupIds.add(group.getIdentifier());
|
||||||
|
|
||||||
|
for (final ProcessorNode procNode : group.getProcessors()) {
|
||||||
|
// Include all referenced services that are not already included in this snippet.
|
||||||
|
getControllerServices(procNode.getEffectivePropertyValues()).stream()
|
||||||
|
.filter(allServicesReferenced::add)
|
||||||
|
.filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
|
||||||
|
.forEach(svc -> {
|
||||||
|
final String svcGroupId = svc.getParentGroupId();
|
||||||
|
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId;
|
||||||
|
svc.setParentGroupId(destinationGroupId);
|
||||||
|
final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId).getContents();
|
||||||
|
if (snippetDto != null) {
|
||||||
|
Set<ControllerServiceDTO> services = snippetDto.getControllerServices();
|
||||||
|
if (services == null) {
|
||||||
|
snippetDto.setControllerServices(Collections.singleton(svc));
|
||||||
|
} else {
|
||||||
|
services.add(svc);
|
||||||
|
snippetDto.setControllerServices(services);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map child process group ID to the child process group for easy lookup
|
||||||
|
final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
|
||||||
|
.collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto));
|
||||||
|
|
||||||
|
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
||||||
|
final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier());
|
||||||
|
if (childDto == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
addControllerServices(childGroup, childDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,6 +392,7 @@ public final class SnippetUtils {
|
||||||
return serviceDtos;
|
return serviceDtos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
|
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
|
||||||
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
|
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
|
||||||
resolveNameConflicts(snippetCopy, group);
|
resolveNameConflicts(snippetCopy, group);
|
||||||
|
|
Loading…
Reference in New Issue