NIFI-2347: - Ensuring component specific policies are retained when using copy/paste. - This includes the policies for the component, data of the component, data transfers of the component, and policies of the component.

This closes #730

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Matt Gilman 2016-07-27 21:24:46 -04:00 committed by jpercivall
parent 25cadf5db1
commit 09b124714e
3 changed files with 421 additions and 213 deletions

View File

@ -92,10 +92,19 @@ public class StandardSnippetDAO implements SnippetDAO {
org.apache.nifi.util.SnippetUtils.moveSnippet(snippetContents, originX, originY);
}
// instantiate the snippet
flowController.instantiateSnippet(processGroup, snippetContents);
try {
// instantiate the snippet and return the contents
flowController.instantiateSnippet(processGroup, snippetContents);
return snippetContents;
} catch (IllegalStateException ise) {
// illegal state will be thrown from instantiateSnippet when there is an issue with the snippet _before_ any of the
// components are actually created. if we've received this exception we want to attempt to roll back any of the
// policies that we've already cloned for this request
snippetUtils.rollbackClonedPolicies(snippetContents);
return snippetContents;
// rethrow the same exception
throw ise;
}
} catch (ProcessorInstantiationException pie) {
throw new NiFiCoreException(String.format("Unable to copy snippet because processor type '%s' is unknown to this NiFi.",
StringUtils.substringAfterLast(pie.getMessage(), ".")));

View File

@ -16,19 +16,12 @@
*/
package org.apache.nifi.web.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@ -44,6 +37,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.util.TypeOneUUIDGenerator;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@ -59,15 +53,34 @@ import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Template utilities.
*/
public final class SnippetUtils {
private static final Logger logger = LoggerFactory.getLogger(SnippetUtils.class);
private FlowController flowController;
private DtoFactory dtoFactory;
private AccessPolicyDAO accessPolicyDAO;
/**
@ -262,8 +275,7 @@ public final class SnippetUtils {
}
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group,
final String idGenerationSeed, boolean isCopy) {
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
resolveNameConflicts(snippetCopy, group);
return snippetCopy;
@ -319,210 +331,392 @@ public final class SnippetUtils {
}
}
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap,
final String idGenerationSeed, boolean isCopy) {
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap,
Map<String, String> serviceIdMap, final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetContentsCopy = new FlowSnippetDTO();
//
// Copy the Controller Services
//
if (serviceIdMap == null) {
serviceIdMap = new HashMap<>();
}
final Set<ControllerServiceDTO> services = new HashSet<>();
if (snippetContents.getControllerServices() != null) {
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
service.setId(generateId(serviceDTO.getId(), idGenerationSeed, isCopy));
service.setState(ControllerServiceState.DISABLED.name());
services.add(service);
// Map old service ID to new service ID so that we can make sure that we reference the new ones.
serviceIdMap.put(serviceDTO.getId(), service.getId());
try {
//
// Copy the Controller Services
//
if (serviceIdMap == null) {
serviceIdMap = new HashMap<>();
}
}
// if there is any controller service that maps to another controller service, update the id's
for (final ControllerServiceDTO serviceDTO : services) {
final Map<String, String> properties = serviceDTO.getProperties();
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
if (properties != null && descriptors != null) {
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
if (descriptor.getIdentifiesControllerService() != null) {
final String currentServiceId = properties.get(descriptor.getName());
if (currentServiceId == null) {
continue;
final Set<ControllerServiceDTO> services = new HashSet<>();
if (snippetContents.getControllerServices() != null) {
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
service.setId(generateId(serviceDTO.getId(), idGenerationSeed, isCopy));
service.setState(ControllerServiceState.DISABLED.name());
services.add(service);
// Map old service ID to new service ID so that we can make sure that we reference the new ones.
serviceIdMap.put(serviceDTO.getId(), service.getId());
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.ControllerService, serviceDTO.getId(), serviceDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.ControllerService, service.getId(), service.getName()), idGenerationSeed);
}
}
}
// if there is any controller service that maps to another controller service, update the id's
for (final ControllerServiceDTO serviceDTO : services) {
final Map<String, String> properties = serviceDTO.getProperties();
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
if (properties != null && descriptors != null) {
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
if (descriptor.getIdentifiesControllerService() != null) {
final String currentServiceId = properties.get(descriptor.getName());
if (currentServiceId == null) {
continue;
}
final String newServiceId = serviceIdMap.get(currentServiceId);
properties.put(descriptor.getName(), newServiceId);
}
}
}
}
snippetContentsCopy.setControllerServices(services);
final String newServiceId = serviceIdMap.get(currentServiceId);
properties.put(descriptor.getName(), newServiceId);
//
// Copy the labels
//
final Set<LabelDTO> labels = new HashSet<>();
if (snippetContents.getLabels() != null) {
for (final LabelDTO labelDTO : snippetContents.getLabels()) {
final LabelDTO label = dtoFactory.copy(labelDTO);
label.setId(generateId(labelDTO.getId(), idGenerationSeed, isCopy));
label.setParentGroupId(groupId);
labels.add(label);
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.Label, labelDTO.getId(), labelDTO.getLabel()),
ResourceFactory.getComponentResource(ResourceType.Label, label.getId(), label.getLabel()), idGenerationSeed);
}
}
}
snippetContentsCopy.setLabels(labels);
//
// Copy connectable components
//
// maps a group ID-ID of a Connectable in the template to the new instance
final Map<String, ConnectableDTO> connectableMap = new HashMap<>();
//
// Copy the funnels
//
final Set<FunnelDTO> funnels = new HashSet<>();
if (snippetContents.getFunnels() != null) {
for (final FunnelDTO funnelDTO : snippetContents.getFunnels()) {
final FunnelDTO cp = dtoFactory.copy(funnelDTO);
cp.setId(generateId(funnelDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
funnels.add(cp);
connectableMap.put(funnelDTO.getParentGroupId() + "-" + funnelDTO.getId(), dtoFactory.createConnectableDto(cp));
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.Funnel, funnelDTO.getId(), funnelDTO.getId()),
ResourceFactory.getComponentResource(ResourceType.Funnel, cp.getId(), cp.getId()), idGenerationSeed);
}
}
}
snippetContentsCopy.setFunnels(funnels);
final Set<PortDTO> inputPorts = new HashSet<>();
if (snippetContents.getInputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getInputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
inputPorts.add(cp);
final ConnectableDTO portConnectable = dtoFactory.createConnectableDto(cp, ConnectableType.INPUT_PORT);
connectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
if (parentConnectableMap != null) {
parentConnectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
}
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.InputPort, portDTO.getId(), portDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.InputPort, cp.getId(), cp.getName()), idGenerationSeed);
}
}
}
snippetContentsCopy.setInputPorts(inputPorts);
final Set<PortDTO> outputPorts = new HashSet<>();
if (snippetContents.getOutputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getOutputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
outputPorts.add(cp);
final ConnectableDTO portConnectable = dtoFactory.createConnectableDto(cp, ConnectableType.OUTPUT_PORT);
connectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
if (parentConnectableMap != null) {
parentConnectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
}
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.OutputPort, portDTO.getId(), portDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.OutputPort, cp.getId(), cp.getName()), idGenerationSeed);
}
}
}
snippetContentsCopy.setOutputPorts(outputPorts);
//
// Copy the processors
//
final Set<ProcessorDTO> processors = new HashSet<>();
if (snippetContents.getProcessors() != null) {
for (final ProcessorDTO processorDTO : snippetContents.getProcessors()) {
final ProcessorDTO cp = dtoFactory.copy(processorDTO);
cp.setId(generateId(processorDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
processors.add(cp);
connectableMap.put(processorDTO.getParentGroupId() + "-" + processorDTO.getId(), dtoFactory.createConnectableDto(cp));
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.Processor, processorDTO.getId(), processorDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.Processor, cp.getId(), cp.getName()), idGenerationSeed);
}
}
}
snippetContentsCopy.setProcessors(processors);
// if there is any controller service that maps to another controller service, update the id's
updateControllerServiceIdentifiers(snippetContentsCopy, serviceIdMap);
//
// Copy ProcessGroups
//
// instantiate the process groups, renaming as necessary
final Set<ProcessGroupDTO> groups = new HashSet<>();
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
final ProcessGroupDTO cp = dtoFactory.copy(groupDTO, false);
cp.setId(generateId(groupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
// copy the contents of this group - we do not copy via the dto factory since we want to specify new ids
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap, idGenerationSeed, isCopy);
cp.setContents(contentsCopy);
groups.add(cp);
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.ProcessGroup, groupDTO.getId(), groupDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.ProcessGroup, cp.getId(), cp.getName()), idGenerationSeed);
}
}
}
snippetContentsCopy.setProcessGroups(groups);
final Set<RemoteProcessGroupDTO> remoteGroups = new HashSet<>();
if (snippetContents.getRemoteProcessGroups() != null) {
for (final RemoteProcessGroupDTO remoteGroupDTO : snippetContents.getRemoteProcessGroups()) {
final RemoteProcessGroupDTO cp = dtoFactory.copy(remoteGroupDTO);
cp.setId(generateId(remoteGroupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
final RemoteProcessGroupContentsDTO contents = cp.getContents();
if (contents != null && contents.getInputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getInputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT));
}
}
if (contents != null && contents.getOutputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getOutputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT));
}
}
remoteGroups.add(cp);
// clone policies as appropriate
if (isCopy) {
cloneComponentSpecificPolicies(
ResourceFactory.getComponentResource(ResourceType.RemoteProcessGroup, remoteGroupDTO.getId(), remoteGroupDTO.getName()),
ResourceFactory.getComponentResource(ResourceType.RemoteProcessGroup, cp.getId(), cp.getName()), idGenerationSeed);
}
}
}
snippetContentsCopy.setRemoteProcessGroups(remoteGroups);
final Set<ConnectionDTO> connections = new HashSet<>();
if (snippetContents.getConnections() != null) {
for (final ConnectionDTO connectionDTO : snippetContents.getConnections()) {
final ConnectionDTO cp = dtoFactory.copy(connectionDTO);
final ConnectableDTO source = connectableMap.get(cp.getSource().getGroupId() + "-" + cp.getSource().getId());
final ConnectableDTO destination = connectableMap.get(cp.getDestination().getGroupId() + "-" + cp.getDestination().getId());
// ensure all referenced components are present
if (source == null || destination == null) {
throw new IllegalArgumentException("The flow snippet contains a Connection that references a component that is not included.");
}
cp.setId(generateId(connectionDTO.getId(), idGenerationSeed, isCopy));
cp.setSource(source);
cp.setDestination(destination);
cp.setParentGroupId(groupId);
connections.add(cp);
// note - no need to copy policies of a connection as their permissions are inferred through the source and destination
}
}
snippetContentsCopy.setConnections(connections);
return snippetContentsCopy;
} catch (Exception e) {
// attempt to role back any policies of the copies that were created in preparation for the clone
rollbackClonedPolicies(snippetContentsCopy);
// rethrow the original exception
throw e;
}
}
/**
* Clones all the component specified policies for the specified original component. This will include the component resource, data resource
* for the component, data transfer resource for the component, and policy resource for the component.
*
* @param originalComponentResource original component resource
* @param clonedComponentResource cloned component resource
* @param idGenerationSeed id generation seed
*/
private void cloneComponentSpecificPolicies(final Resource originalComponentResource, final Resource clonedComponentResource, final String idGenerationSeed) {
final Map<Resource, Resource> resources = new HashMap<>();
resources.put(originalComponentResource, clonedComponentResource);
resources.put(ResourceFactory.getDataResource(originalComponentResource), ResourceFactory.getDataResource(clonedComponentResource));
resources.put(ResourceFactory.getDataTransferResource(originalComponentResource), ResourceFactory.getDataTransferResource(clonedComponentResource));
resources.put(ResourceFactory.getPolicyResource(originalComponentResource), ResourceFactory.getPolicyResource(clonedComponentResource));
for (final Entry<Resource, Resource> entry : resources.entrySet()) {
final Resource originalResource = entry.getKey();
final Resource cloneResource = entry.getValue();
for (final RequestAction action : RequestAction.values()) {
final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(action, originalResource.getIdentifier());
// if there is a component specific policy we want to clone it for the new component
if (accessPolicy != null) {
final AccessPolicyDTO cloneAccessPolicy = new AccessPolicyDTO();
cloneAccessPolicy.setId(generateId(accessPolicy.getIdentifier(), idGenerationSeed, true));
cloneAccessPolicy.setAction(accessPolicy.getAction().toString());
cloneAccessPolicy.setResource(cloneResource.getIdentifier());
final Set<TenantEntity> users = new HashSet<>();
accessPolicy.getUsers().forEach(userId -> {
final TenantEntity entity = new TenantEntity();
entity.setId(userId);
users.add(entity);
});
cloneAccessPolicy.setUsers(users);
final Set<TenantEntity> groups = new HashSet<>();
accessPolicy.getGroups().forEach(groupId -> {
final TenantEntity entity = new TenantEntity();
entity.setId(groupId);
groups.add(entity);
});
cloneAccessPolicy.setUserGroups(groups);
// create the access policy for the cloned policy
accessPolicyDAO.createAccessPolicy(cloneAccessPolicy);
}
}
}
}
/**
* Attempts to roll back and in the specified snippet.
*
* @param snippet snippet
*/
public void rollbackClonedPolicies(final FlowSnippetDTO snippet) {
snippet.getControllerServices().forEach(controllerServiceDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.ControllerService, controllerServiceDTO.getId(), controllerServiceDTO.getName()));
});
snippet.getFunnels().forEach(funnelDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.Funnel, funnelDTO.getId(), funnelDTO.getId()));
});
snippet.getInputPorts().forEach(inputPortDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.InputPort, inputPortDTO.getId(), inputPortDTO.getName()));
});
snippet.getLabels().forEach(labelDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.Label, labelDTO.getId(), labelDTO.getLabel()));
});
snippet.getOutputPorts().forEach(outputPortDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.OutputPort, outputPortDTO.getId(), outputPortDTO.getName()));
});
snippet.getProcessors().forEach(processorDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.Processor, processorDTO.getId(), processorDTO.getName()));
});
snippet.getRemoteProcessGroups().forEach(remoteProcessGroupDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.RemoteProcessGroup, remoteProcessGroupDTO.getId(), remoteProcessGroupDTO.getName()));
});
snippet.getProcessGroups().forEach(processGroupDTO -> {
rollbackClonedPolicy(ResourceFactory.getComponentResource(ResourceType.ProcessGroup, processGroupDTO.getId(), processGroupDTO.getName()));
// consider all descendant components
if (processGroupDTO.getContents() != null) {
rollbackClonedPolicies(processGroupDTO.getContents());
}
});
}
/**
* Attempts to roll back all policies for the specified component. This includes the component resource, data resource
* for the component, data transfer resource for the component, and policy resource for the component.
*
* @param componentResource component resource
*/
private void rollbackClonedPolicy(final Resource componentResource) {
final List<Resource> resources = new ArrayList<>();
resources.add(componentResource);
resources.add(ResourceFactory.getDataResource(componentResource));
resources.add(ResourceFactory.getDataTransferResource(componentResource));
resources.add(ResourceFactory.getPolicyResource(componentResource));
for (final Resource resource : resources) {
for (final RequestAction action : RequestAction.values()) {
final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier());
if (accessPolicy != null) {
try {
accessPolicyDAO.deleteAccessPolicy(accessPolicy.getIdentifier());
} catch (final Exception e) {
logger.warn(String.format("Unable to clean up cloned access policy for %s %s after failed copy/paste action.", action, componentResource.getIdentifier()), e);
}
}
}
}
snippetContentsCopy.setControllerServices(services);
//
// Copy the labels
//
final Set<LabelDTO> labels = new HashSet<>();
if (snippetContents.getLabels() != null) {
for (final LabelDTO labelDTO : snippetContents.getLabels()) {
final LabelDTO label = dtoFactory.copy(labelDTO);
label.setId(generateId(labelDTO.getId(), idGenerationSeed, isCopy));
label.setParentGroupId(groupId);
labels.add(label);
}
}
snippetContentsCopy.setLabels(labels);
//
// Copy connectable components
//
// maps a group ID-ID of a Connectable in the template to the new instance
final Map<String, ConnectableDTO> connectableMap = new HashMap<>();
//
// Copy the funnels
//
final Set<FunnelDTO> funnels = new HashSet<>();
if (snippetContents.getFunnels() != null) {
for (final FunnelDTO funnelDTO : snippetContents.getFunnels()) {
final FunnelDTO cp = dtoFactory.copy(funnelDTO);
cp.setId(generateId(funnelDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
funnels.add(cp);
connectableMap.put(funnelDTO.getParentGroupId() + "-" + funnelDTO.getId(), dtoFactory.createConnectableDto(cp));
}
}
snippetContentsCopy.setFunnels(funnels);
final Set<PortDTO> inputPorts = new HashSet<>();
if (snippetContents.getInputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getInputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
inputPorts.add(cp);
final ConnectableDTO portConnectable = dtoFactory.createConnectableDto(cp, ConnectableType.INPUT_PORT);
connectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
if (parentConnectableMap != null) {
parentConnectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
}
}
}
snippetContentsCopy.setInputPorts(inputPorts);
final Set<PortDTO> outputPorts = new HashSet<>();
if (snippetContents.getOutputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getOutputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
outputPorts.add(cp);
final ConnectableDTO portConnectable = dtoFactory.createConnectableDto(cp, ConnectableType.OUTPUT_PORT);
connectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
if (parentConnectableMap != null) {
parentConnectableMap.put(portDTO.getParentGroupId() + "-" + portDTO.getId(), portConnectable);
}
}
}
snippetContentsCopy.setOutputPorts(outputPorts);
//
// Copy the processors
//
final Set<ProcessorDTO> processors = new HashSet<>();
if (snippetContents.getProcessors() != null) {
for (final ProcessorDTO processorDTO : snippetContents.getProcessors()) {
final ProcessorDTO cp = dtoFactory.copy(processorDTO);
cp.setId(generateId(processorDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
processors.add(cp);
connectableMap.put(processorDTO.getParentGroupId() + "-" + processorDTO.getId(), dtoFactory.createConnectableDto(cp));
}
}
snippetContentsCopy.setProcessors(processors);
// if there is any controller service that maps to another controller service, update the id's
updateControllerServiceIdentifiers(snippetContentsCopy, serviceIdMap);
//
// Copy ProcessGroups
//
// instantiate the process groups, renaming as necessary
final Set<ProcessGroupDTO> groups = new HashSet<>();
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
final ProcessGroupDTO cp = dtoFactory.copy(groupDTO, false);
cp.setId(generateId(groupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
// copy the contents of this group - we do not copy via the dto factory since we want to specify new ids
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap, idGenerationSeed, isCopy);
cp.setContents(contentsCopy);
groups.add(cp);
}
}
snippetContentsCopy.setProcessGroups(groups);
final Set<RemoteProcessGroupDTO> remoteGroups = new HashSet<>();
if (snippetContents.getRemoteProcessGroups() != null) {
for (final RemoteProcessGroupDTO remoteGroupDTO : snippetContents.getRemoteProcessGroups()) {
final RemoteProcessGroupDTO cp = dtoFactory.copy(remoteGroupDTO);
cp.setId(generateId(remoteGroupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
final RemoteProcessGroupContentsDTO contents = cp.getContents();
if (contents != null && contents.getInputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getInputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT));
}
}
if (contents != null && contents.getOutputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getOutputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT));
}
}
remoteGroups.add(cp);
}
}
snippetContentsCopy.setRemoteProcessGroups(remoteGroups);
final Set<ConnectionDTO> connections = new HashSet<>();
if (snippetContents.getConnections() != null) {
for (final ConnectionDTO connectionDTO : snippetContents.getConnections()) {
final ConnectionDTO cp = dtoFactory.copy(connectionDTO);
final ConnectableDTO source = connectableMap.get(cp.getSource().getGroupId() + "-" + cp.getSource().getId());
final ConnectableDTO destination = connectableMap.get(cp.getDestination().getGroupId() + "-" + cp.getDestination().getId());
// ensure all referenced components are present
if (source == null || destination == null) {
throw new IllegalArgumentException("The flow snippet contains a Connection that references a component that is not included.");
}
cp.setId(generateId(connectionDTO.getId(), idGenerationSeed, isCopy));
cp.setSource(source);
cp.setDestination(destination);
cp.setParentGroupId(groupId);
connections.add(cp);
}
}
snippetContentsCopy.setConnections(connections);
return snippetContentsCopy;
}
private void updateControllerServiceIdentifiers(final FlowSnippetDTO snippet, final Map<String, String> serviceIdMap) {
@ -585,6 +779,10 @@ public final class SnippetUtils {
this.flowController = flowController;
}
public void setAccessPolicyDAO(AccessPolicyDAO accessPolicyDAO) {
this.accessPolicyDAO = accessPolicyDAO;
}
/**
* Will normalize the coordinates of the processors to ensure their
* consistency across exports. It will do so by fist calculating the

View File

@ -58,6 +58,7 @@
<bean id="snippetUtils" class="org.apache.nifi.web.util.SnippetUtils">
<property name="dtoFactory" ref="dtoFactory"/>
<property name="flowController" ref="flowController"/>
<property name="accessPolicyDAO" ref="policyBasedAuthorizerDAO"/>
</bean>
<!-- nifi component dao initialization -->