NIFI-3173: When a template is created with multiple components in different groups all referencing the same controller service, ensure that controller service is added to the template at a high enough level that all components needing hte service can access it.

- Ensure that controller services are added to child process groups when creating snippet

- Addressed issue related to modifying higher-level process groups' controller services in snippet after having already visited the process group

This closes #1318

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Mark Payne 2016-12-11 20:36:28 -05:00 committed by jpercivall
parent 0f462a7c49
commit 5776c4b1f9
1 changed files with 85 additions and 11 deletions

View File

@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -109,10 +110,48 @@ public final class SnippetUtils {
throw new IllegalStateException("The parent process group for this snippet could not be found."); throw new IllegalStateException("The parent process group for this snippet could not be found.");
} }
final Set<ControllerServiceDTO> controllerServices = new HashSet<>(); // We need to ensure that the Controller Services that are added get added to the proper group.
// This can potentially get a little bit tricky. Consider this scenario:
// We have a Process Group G1. Within Process Group G1 is a Controller Service C1.
// Also within G1 is a child Process Group, G2. Within G2 is a child Process Group, G3.
// Within G3 are two child Process Groups: G4 and G5. Within each of these children,
// we have a Processor (P1, P2) that references the Controller Service C1, defined 3 levels above.
// Now, we create a template that encompasses only Process Groups G4 and G5. We need to ensure
// that the Controller Service C1 is included at the 'root' of the template so that those
// Processors within G4 and G5 both have access to the same Controller Service. This can be drawn
// out thus:
//
// G1 -- C1
// |
// |
// G2
// |
// |
// G3
// | \
// | \
// G4 G5
// | |
// | |
// P1 P2
//
// Both P1 and P2 reference C1.
//
// In order to accomplish this, we maintain two collections. First, we keep a Set of all Controller Services that have
// been added. If we add a new Controller Service to the set, then we know it hasn't been added anywhere in the Snippet.
// In that case, we determine the service's group ID. In the flow described above, if we template just groups G4 and G5,
// then we need to include the Controller Service defined at G1. So we also keep a Map of Group ID to controller services
// in that group. If the ParentGroupId of a Controller Service is not in our snippet, then we instead update the parent
// ParentGroupId to be that of our highest-level process group (in this case G3, as that's where the template is created)
// and then add the controller services to that group (NOTE: here, when we say we change the group ID and add to that group,
// we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the
// Controller Services in our flow themselves!)
final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>(); final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>();
final Map<String, FlowSnippetDTO> contentsByGroup = new HashMap<>();
contentsByGroup.put(processGroup.getIdentifier(), snippetDto);
// add any processors // add any processors
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
final Set<ProcessorDTO> processors = new LinkedHashSet<>(); final Set<ProcessorDTO> processors = new LinkedHashSet<>();
if (!snippet.getProcessors().isEmpty()) { if (!snippet.getProcessors().isEmpty()) {
for (final String processorId : snippet.getProcessors().keySet()) { for (final String processorId : snippet.getProcessors().keySet()) {
@ -126,7 +165,12 @@ public final class SnippetUtils {
// Include all referenced services that are not already included in this snippet. // Include all referenced services that are not already included in this snippet.
getControllerServices(processor.getProperties()).stream() getControllerServices(processor.getProperties()).stream()
.filter(svc -> allServicesReferenced.add(svc)) .filter(svc -> allServicesReferenced.add(svc))
.forEach(svc -> controllerServices.add(svc)); .forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
svc.setParentGroupId(destinationGroupId);
controllerServices.add(svc);
});
} }
} }
} }
@ -203,7 +247,7 @@ public final class SnippetUtils {
final ProcessGroupDTO childGroupDto = dtoFactory.createProcessGroupDto(childGroup, recurse); final ProcessGroupDTO childGroupDto = dtoFactory.createProcessGroupDto(childGroup, recurse);
processGroups.add(childGroupDto); processGroups.add(childGroupDto);
addControllerServices(childGroup, childGroupDto, allServicesReferenced); addControllerServices(childGroup, childGroupDto, allServicesReferenced, contentsByGroup, processGroup.getIdentifier());
} }
} }
@ -219,6 +263,7 @@ public final class SnippetUtils {
} }
} }
// Normalize the coordinates based on the locations of the other components // Normalize the coordinates based on the locations of the other components
final List<? extends ComponentDTO> components = new ArrayList<>(); final List<? extends ComponentDTO> components = new ArrayList<>();
components.addAll((Set) processors); components.addAll((Set) processors);
@ -231,6 +276,13 @@ public final class SnippetUtils {
components.addAll((Set) remoteProcessGroups); components.addAll((Set) remoteProcessGroups);
normalizeCoordinates(components); normalizeCoordinates(components);
Set<ControllerServiceDTO> updatedControllerServices = snippetDto.getControllerServices();
if (updatedControllerServices == null) {
updatedControllerServices = new HashSet<>();
}
updatedControllerServices.addAll(controllerServices);
snippetDto.setControllerServices(updatedControllerServices);
snippetDto.setProcessors(processors); snippetDto.setProcessors(processors);
snippetDto.setConnections(connections); snippetDto.setConnections(connections);
snippetDto.setFunnels(funnels); snippetDto.setFunnels(funnels);
@ -240,27 +292,49 @@ public final class SnippetUtils {
snippetDto.setProcessGroups(processGroups); snippetDto.setProcessGroups(processGroups);
snippetDto.setRemoteProcessGroups(remoteProcessGroups); snippetDto.setRemoteProcessGroups(remoteProcessGroups);
snippetDto.setControllerServices(controllerServices);
return snippetDto; return snippetDto;
} }
private void addControllerServices(final ProcessGroup group, final ProcessGroupDTO dto, final Set<ControllerServiceDTO> allServicesReferenced) { /**
* Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and
* adds them to the given servicesByGroup map
*
* @param group the Process Group to start from
* @param dto the DTO representation of the Process Group
* @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services
* @param contentsByGroup a Map of Process Group ID to the Process Group's contents
* @param highestGroupId the UUID of the 'highest' process group in the snippet
*/
private void addControllerServices(final ProcessGroup group, final ProcessGroupDTO dto, final Set<ControllerServiceDTO> allServicesReferenced,
final Map<String, FlowSnippetDTO> contentsByGroup, final String highestGroupId) {
final FlowSnippetDTO contents = dto.getContents(); final FlowSnippetDTO contents = dto.getContents();
contentsByGroup.put(dto.getId(), contents);
if (contents == null) { if (contents == null) {
return; return;
} }
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
for (final ProcessorNode procNode : group.getProcessors()) { for (final ProcessorNode procNode : group.getProcessors()) {
// Include all referenced services that are not already included in this snippet. // Include all referenced services that are not already included in this snippet.
getControllerServices(procNode.getProperties()).stream() getControllerServices(procNode.getProperties()).stream()
.filter(svc -> allServicesReferenced.add(svc)) .filter(svc -> allServicesReferenced.add(svc))
.forEach(svc -> controllerServices.add(svc)); .forEach(svc -> {
final String svcGroupId = svc.getParentGroupId();
final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId;
svc.setParentGroupId(destinationGroupId);
final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId);
if (snippetDto != null) {
Set<ControllerServiceDTO> services = snippetDto.getControllerServices();
if (services == null) {
snippetDto.setControllerServices(Collections.singleton(svc));
} else {
services.add(svc);
snippetDto.setControllerServices(services);
}
}
});
} }
contents.setControllerServices(controllerServices);
// Map child process group ID to the child process group for easy lookup // Map child process group ID to the child process group for easy lookup
final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream() final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
@ -272,7 +346,7 @@ public final class SnippetUtils {
continue; continue;
} }
addControllerServices(childGroup, childDto, allServicesReferenced); addControllerServices(childGroup, childDto, allServicesReferenced, contentsByGroup, highestGroupId);
} }
} }