From 058883091cf2cae0ff344136d6d58dd8efe0fd6c Mon Sep 17 00:00:00 2001 From: Joe Ferner Date: Tue, 19 Nov 2019 10:15:42 -0500 Subject: [PATCH] NIFI-6872: - Added UI versioned flow supportsDownload functionality with download flow menu item - Added VersionsResource endpoint for downloading versioned flow with registry-related info removed - Added ProcessGroupResource endpoint for downloading current flow with registry-related info removed - Added StandardNifiServiceFacade functionality for downloading both current and versioned flow - Added XmlTransient markers on variables introduced by Instantiated model classes so they do not appear in serialized download - Updated NiFiRegistryFlowMapper.mapParameterContexts to handle mapping nested parameter contexts for use in producing a complete VersionedFlowSnapshot - Added ability for NiFiRegistryFlowMapper to map nested process groups ignoring versioning for use in producing a complete VersionedFlowSnapshot - Added unit tests where helpful NIFI-6872: PR response... - Updated mapParameterContext to return a Map to handle uniqueness of contexts by name since ultimately everything converted it to a map anyway. The VersionedParameterContext class from the registry model doesn't support hashcode/equals currently so returning a Set wouldn't work. - Updated assert calls to put expected value as first parameter and actual as second parameter - Added one time password (OTP) support for flow download endpoint to support non cert based authentication This closes #3931 --- .../web/api/entity/VersionedFlowEntity.java | 4 +- .../nifi/registry/flow/FlowRegistry.java | 5 +- .../registry/flow/RestBasedFlowRegistry.java | 14 +- .../InstantiatedVersionedComponent.java | 6 + .../InstantiatedVersionedProcessGroup.java | 4 + .../flow/mapping/NiFiRegistryFlowMapper.java | 317 ++++--- .../MockSingleFlowRegistryClient.java | 3 +- .../mapping/NiFiRegistryFlowMapperTest.java | 780 ++++++++++++++++++ .../apache/nifi/web/NiFiServiceFacade.java | 26 +- .../nifi/web/StandardNiFiServiceFacade.java | 134 ++- .../nifi/web/api/ApplicationResource.java | 22 +- .../nifi/web/api/ProcessGroupResource.java | 45 + .../apache/nifi/web/api/VersionsResource.java | 61 ++ .../web/StandardNiFiServiceFacadeTest.java | 78 +- .../web/api/TestProcessGroupResource.java | 62 ++ .../nifi/web/api/TestVersionsResource.java | 81 ++ .../security/otp/OtpAuthenticationFilter.java | 5 +- .../main/webapp/js/nf/canvas/nf-actions.js | 42 +- .../webapp/js/nf/canvas/nf-context-menu.js | 185 ++--- 19 files changed, 1535 insertions(+), 339 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java index 26772ad7c3..c6c19f3291 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java @@ -31,7 +31,7 @@ public class VersionedFlowEntity extends Entity { return versionedFlow; } - public void setVersionedFlow(VersionedFlowDTO versionedFLow) { - this.versionedFlow = versionedFLow; + public void setVersionedFlow(VersionedFlowDTO versionedFlow) { + this.versionedFlow = versionedFlow; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 92fc8a1cbd..f1d09958c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -23,7 +23,6 @@ import org.apache.nifi.registry.client.NiFiRegistryException; import java.io.IOException; import java.util.Map; -import java.util.Collection; import java.util.Set; public interface FlowRegistry { @@ -134,7 +133,7 @@ public interface FlowRegistry { * @param snapshot the snapshot of the flow * @param externalControllerServices a mapping of of Controller Service identifier to ExternalControllerServiceReference for any Controller Service that is referenced by the flow but that are * not included as part of the VersionedProcessGroup - * @param parameterContexts the Parameter Contexts to include in the snapshot + * @param parameterContexts a map of the Parameter Contexts to include in the snapshot keyed by name * @param comments any comments for the snapshot * @param expectedVersion the version of the flow that we expect to save this snapshot as * @return the versioned flow snapshot @@ -144,7 +143,7 @@ public interface FlowRegistry { * @throws NiFiRegistryException if the flow does not exist */ VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, Map externalControllerServices, - Collection parameterContexts, String comments, + Map parameterContexts, String comments, int expectedVersion, NiFiUser user) throws IOException, NiFiRegistryException; /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index a7f87d2f8d..35692e6044 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -29,14 +29,12 @@ import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient; import javax.net.ssl.SSLContext; import java.io.IOException; -import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class RestBasedFlowRegistry implements FlowRegistry { - private static final String FLOW_ENCODING_VERSION = "1.0"; + public static final String FLOW_ENCODING_VERSION = "1.0"; private final FlowRegistryClient flowRegistryClient; private final String identifier; @@ -179,17 +177,13 @@ public class RestBasedFlowRegistry implements FlowRegistry { @Override public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final Map externalControllerServices, - Collection parameterContexts, final String comments, final int expectedVersion, - final NiFiUser user) throws IOException, NiFiRegistryException { - - final Map parameterContextMap = parameterContexts.stream() - .collect(Collectors.toMap(VersionedParameterContext::getName, context -> context)); - + final Map parameterContexts, final String comments, + final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException { final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); versionedFlowSnapshot.setFlowContents(snapshot); versionedFlowSnapshot.setExternalControllerServices(externalControllerServices); - versionedFlowSnapshot.setParameterContexts(parameterContextMap); + versionedFlowSnapshot.setParameterContexts(parameterContexts); versionedFlowSnapshot.setFlowEncodingVersion(FLOW_ENCODING_VERSION); final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java index 15c620a3ed..e655c16d1a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java @@ -17,8 +17,14 @@ package org.apache.nifi.registry.flow.mapping; +import javax.xml.bind.annotation.XmlTransient; + public interface InstantiatedVersionedComponent { + + // mark transient so fields are ignored when serializing all versioned component types + @XmlTransient String getInstanceId(); + @XmlTransient String getInstanceGroupId(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java index 3eed715b0d..967eec3697 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java @@ -20,9 +20,11 @@ package org.apache.nifi.registry.flow.mapping; import org.apache.nifi.registry.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.VersionedProcessGroup; +import javax.xml.bind.annotation.XmlTransient; import java.util.Map; public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup implements InstantiatedVersionedComponent { + private final String instanceId; private final String groupId; private Map externalControllerServiceReferences; @@ -46,6 +48,8 @@ public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup imp this.externalControllerServiceReferences = externalControllerServiceReferences; } + // mark transient so field is ignored when serializing this class + @XmlTransient public Map getExternalControllerServiceReferences() { return externalControllerServiceReferences; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 67be279ef4..2488adfa77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -27,8 +27,8 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -80,6 +80,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -90,7 +91,7 @@ public class NiFiRegistryFlowMapper { // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' - // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been + // identifier based on the component's actual id. We do connections last, so that all components will already have been // created before attempting to create the connection, where the ConnectableDTO is converted. private Map versionedComponentIds = new HashMap<>(); @@ -98,14 +99,157 @@ public class NiFiRegistryFlowMapper { this.extensionManager = extensionManager; } - public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, + /** + * Map the given process group to a versioned process group without any use of an actual flow registry even if the + * group is currently versioned in a registry. + * + * @param group the process group to map + * @param serviceProvider the controller service provider to use for mapping + * @return a complete versioned process group without any registry related details + */ + public InstantiatedVersionedProcessGroup mapNonVersionedProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider) { + versionedComponentIds.clear(); + + // always include descendant flows and do not apply any registry versioning info that may be present in the group + return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> true); + } + + /** + * Map the given process group to a versioned process group using the provided registry client. + * + * @param group the process group to map + * @param serviceProvider the controller service provider to use for mapping + * @param registryClient the registry client to use when retrieving versioning details + * @param mapDescendantVersionedFlows true in order to include descendant flows in the mapped result + * @return a complete versioned process group with applicable registry related details + */ + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, + final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) { versionedComponentIds.clear(); - final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows); - populateReferencedAncestorVariables(group, mapped); + // apply registry versioning according to the lambda below + // NOTE: lambda refers to registry client and map descendant boolean which will not change during recursion + return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> { + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo != null) { + final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates(); + final String registryId = versionControlInfo.getRegistryIdentifier(); + final FlowRegistry registry = registryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL."); + } - return mapped; + coordinates.setRegistryUrl(registry.getURL()); + coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); + coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); + coordinates.setVersion(versionControlInfo.getVersion()); + versionedGroup.setVersionedFlowCoordinates(coordinates); + + // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports. + // Otherwise, we will not be able to lookup the port when connecting to it. + for (final Port port : processGroup.getInputPorts()) { + getId(port.getVersionedComponentId(), port.getIdentifier()); + } + for (final Port port : processGroup.getOutputPorts()) { + getId(port.getVersionedComponentId(), port.getIdentifier()); + } + + // If the Process Group itself is remotely versioned, then we don't want to include its contents + // because the contents are remotely managed and not part of the versioning of this Process Group + return mapDescendantVersionedFlows; + } + return true; + }); + } + + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, + final BiFunction applyVersionControlInfo) { + final Set allIncludedGroupsIds = group.findAllProcessGroups().stream() + .map(ProcessGroup::getIdentifier) + .collect(Collectors.toSet()); + allIncludedGroupsIds.add(group.getIdentifier()); + + final Map externalControllerServiceReferences = new HashMap<>(); + final InstantiatedVersionedProcessGroup versionedGroup = + mapGroup(group, serviceProvider, applyVersionControlInfo, true, allIncludedGroupsIds, externalControllerServiceReferences); + + populateReferencedAncestorVariables(group, versionedGroup); + + return versionedGroup; + } + + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, + final BiFunction applyVersionControlInfo, + final boolean topLevel, final Set includedGroupIds, + final Map externalControllerServiceReferences) { + + final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); + versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); + versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); + versionedGroup.setName(group.getName()); + versionedGroup.setComments(group.getComments()); + versionedGroup.setPosition(mapPosition(group.getPosition())); + + final ParameterContext parameterContext = group.getParameterContext(); + versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName()); + + // If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for, + // then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used + // only for a child group that is itself version controlled. + if (!topLevel) { + final boolean mapDescendantVersionedFlows = applyVersionControlInfo.apply(group, versionedGroup); + + // return here if we do not want to include remotely managed descendant flows + if (!mapDescendantVersionedFlows) { + return versionedGroup; + } + } + + versionedGroup.setControllerServices(group.getControllerServices(false).stream() + .map(service -> mapControllerService(service, serviceProvider, includedGroupIds, externalControllerServiceReferences)) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setFunnels(group.getFunnels().stream() + .map(this::mapFunnel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setInputPorts(group.getInputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setOutputPorts(group.getOutputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setLabels(group.getLabels().stream() + .map(this::mapLabel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessors(group.getProcessors().stream() + .map(processor -> mapProcessor(processor, serviceProvider, includedGroupIds, externalControllerServiceReferences)) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() + .map(this::mapRemoteProcessGroup) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessGroups(group.getProcessGroups().stream() + .map(grp -> mapGroup(grp, serviceProvider, applyVersionControlInfo, false, includedGroupIds, externalControllerServiceReferences)) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setConnections(group.getConnections().stream() + .map(this::mapConnection) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue))); + + if (topLevel) { + versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences); + } + + return versionedGroup; } private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { @@ -138,121 +282,12 @@ public class NiFiRegistryFlowMapper { } group.getVariableRegistry().getVariableMap().keySet().stream() - .map(VariableDescriptor::getName) - .forEach(variableNames::add); + .map(VariableDescriptor::getName) + .forEach(variableNames::add); populateVariableNames(group.getParent(), variableNames); } - private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, - final boolean topLevel, final boolean mapDescendantVersionedFlows) { - - final Set allIncludedGroupsIds = group.findAllProcessGroups().stream() - .map(ProcessGroup::getIdentifier) - .collect(Collectors.toSet()); - allIncludedGroupsIds.add(group.getIdentifier()); - - final Map externalControllerServiceReferences = new HashMap<>(); - return mapGroup(group, serviceProvider, registryClient, topLevel, mapDescendantVersionedFlows, allIncludedGroupsIds, externalControllerServiceReferences); - } - - - private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, - final boolean topLevel, final boolean mapDescendantVersionedFlows, final Set includedGroupIds, - final Map externalControllerServiceReferences) { - - final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); - versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); - versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); - versionedGroup.setName(group.getName()); - versionedGroup.setComments(group.getComments()); - versionedGroup.setPosition(mapPosition(group.getPosition())); - - final ParameterContext parameterContext = group.getParameterContext(); - versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName()); - - // If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for, - // then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used - // only for a child group that is itself version controlled. - if (!topLevel) { - final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); - if (versionControlInfo != null) { - final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates(); - final String registryId = versionControlInfo.getRegistryIdentifier(); - final FlowRegistry registry = registryClient.getFlowRegistry(registryId); - if (registry == null) { - throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL."); - } - - coordinates.setRegistryUrl(registry.getURL()); - coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); - coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); - coordinates.setVersion(versionControlInfo.getVersion()); - versionedGroup.setVersionedFlowCoordinates(coordinates); - - // We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports. - // Otherwise, we will not be able to lookup the port when connecting to it. - for (final Port port : group.getInputPorts()) { - getId(port.getVersionedComponentId(), port.getIdentifier()); - } - for (final Port port : group.getOutputPorts()) { - getId(port.getVersionedComponentId(), port.getIdentifier()); - } - - // If the Process Group itself is remotely versioned, then we don't want to include its contents - // because the contents are remotely managed and not part of the versioning of this Process Group - if (!mapDescendantVersionedFlows) { - return versionedGroup; - } - } - } - - versionedGroup.setControllerServices(group.getControllerServices(false).stream() - .map(service -> mapControllerService(service, serviceProvider, includedGroupIds, externalControllerServiceReferences)) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setFunnels(group.getFunnels().stream() - .map(this::mapFunnel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setInputPorts(group.getInputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setOutputPorts(group.getOutputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setLabels(group.getLabels().stream() - .map(this::mapLabel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessors(group.getProcessors().stream() - .map(processor -> mapProcessor(processor, serviceProvider, includedGroupIds, externalControllerServiceReferences)) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() - .map(this::mapRemoteProcessGroup) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessGroups(group.getProcessGroups().stream() - .map(grp -> mapGroup(grp, serviceProvider, registryClient, false, mapDescendantVersionedFlows, includedGroupIds, externalControllerServiceReferences)) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setConnections(group.getConnections().stream() - .map(this::mapConnection) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue))); - - if (topLevel) { - versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences); - } - - return versionedGroup; - } - private String getId(final Optional currentVersionedId, final String componentId) { final String versionedId; if (currentVersionedId.isPresent()) { @@ -279,7 +314,7 @@ public class NiFiRegistryFlowMapper { } - private String getGroupId(final String groupId) { + public String getGroupId(final String groupId) { return versionedComponentIds.get(groupId); } @@ -599,23 +634,39 @@ public class NiFiRegistryFlowMapper { return batchSize; } - public VersionedParameterContext mapParameterContext(final ParameterContext context) { - if (context == null) { - return null; - } - - final Set parameters = context.getParameters().values().stream() - .map(this::mapParameter) - .collect(Collectors.toSet()); - - final VersionedParameterContext versionedContext = new VersionedParameterContext(); - versionedContext.setName(context.getName()); - versionedContext.setParameters(parameters); - - return versionedContext; + public Map mapParameterContexts(final ProcessGroup processGroup, + final boolean mapDescendantVersionedFlows) { + // cannot use a set to enforce uniqueness of parameter contexts because VersionedParameterContext in the + // registry data model doesn't currently implement hashcode/equals based on context name + final Map parameterContexts = new HashMap<>(); + mapParameterContexts(processGroup, mapDescendantVersionedFlows, parameterContexts); + return parameterContexts; } - public VersionedParameter mapParameter(final Parameter parameter) { + private void mapParameterContexts(final ProcessGroup processGroup, final boolean mapDescendantVersionedFlows, + final Map parameterContexts) { + final ParameterContext parameterContext = processGroup.getParameterContext(); + if (parameterContext != null) { + // map this process group's parameter context and add to the collection + final Set parameters = parameterContext.getParameters().values().stream() + .map(this::mapParameter) + .collect(Collectors.toSet()); + + final VersionedParameterContext versionedContext = new VersionedParameterContext(); + versionedContext.setName(parameterContext.getName()); + versionedContext.setParameters(parameters); + parameterContexts.put(versionedContext.getName(), versionedContext); + } + + for (final ProcessGroup child : processGroup.getProcessGroups()) { + // only include child process group parameter contexts if boolean indicator is true or process group is unversioned + if (mapDescendantVersionedFlows || child.getVersionControlInformation() == null) { + mapParameterContexts(child, mapDescendantVersionedFlows, parameterContexts); + } + } + } + + private VersionedParameter mapParameter(final Parameter parameter) { if (parameter == null) { return null; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java index 0a6c9322b6..8d32329b4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java @@ -29,7 +29,6 @@ import org.apache.nifi.registry.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.VersionedProcessGroup; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -170,7 +169,7 @@ public class MockSingleFlowRegistryClient implements FlowRegistryClient { @Override public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final Map externalControllerServices, - final Collection parameterContexts, final String comments, + final Map parameterContexts, final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException { return null; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java new file mode 100644 index 0000000000..3a678110f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java @@ -0,0 +1,780 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.parameter.Parameter; +import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterDescriptor; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.PortType; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; +import org.apache.nifi.registry.flow.VersionedFunnel; +import org.apache.nifi.registry.flow.VersionedLabel; +import org.apache.nifi.registry.flow.VersionedParameter; +import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NiFiRegistryFlowMapperTest { + + @Mock + private ExtensionManager extensionManager; + @Mock + private ControllerServiceProvider controllerServiceProvider; + @Mock + private FlowRegistryClient flowRegistryClient; + + private NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(extensionManager); + + private int counter = 1; + + @Before + public void setup() { + final FlowRegistry flowRegistry = mock(FlowRegistry.class); + when(flowRegistryClient.getFlowRegistry(anyString())).thenReturn(flowRegistry); + when(flowRegistry.getURL()).thenReturn("url"); + } + + /** + * Test mapping versioned process group's parameter contexts excluding descendant versioned process groups + */ + @Test + public void testMapParameterContextsExcludingVersionedDescendants() { + final ProcessGroup innerInnerProcessGroup = + prepareProcessGroupWithParameterContext(Collections.emptyList(), + true, true); + final ProcessGroup innerProcessGroup = + prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup), + true, false); + final ProcessGroup processGroup = + prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup), + false, false); + + // first nesting should be traversed because child is not version controlled, but deeper nesting should be ignored + // because map versioned descendants indicator is false + final Map versionedParameterContexts = + flowMapper.mapParameterContexts(processGroup, false); + + // verify single parameter context + assertEquals(1, versionedParameterContexts.size()); + + final String expectedName = innerProcessGroup.getParameterContext().getName(); + verifyParameterContext(innerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName)); + } + + /** + * Test mapping nested process group's parameter contexts + */ + @Test + public void testMapNestedParameterContexts() { + final ProcessGroup innerInnerProcessGroup = + prepareProcessGroupWithParameterContext(Collections.emptyList(), + true, true); + final ProcessGroup innerProcessGroup = + prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup), + false, true); + final ProcessGroup processGroup = + prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup), + true, true); + + // include nested parameter contexts even though they are version controlled because map descendant indicator is true + final Map versionedParameterContexts = + flowMapper.mapParameterContexts(processGroup, true); + + // verify parameter contexts + assertEquals(2, versionedParameterContexts.size()); + + final String expectedName1 = processGroup.getParameterContext().getName(); + final String expectedName2 = innerInnerProcessGroup.getParameterContext().getName(); + verifyParameterContext(processGroup.getParameterContext(), versionedParameterContexts.get(expectedName1)); + verifyParameterContext(innerInnerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName2)); + } + + /** + * Test mapping a versioned ProcessGroup model to a versioned VersionedProcessGroup excluding descendant versioned flows. + * VersionControlInformation should be mapped to the versioned inner process group instead of the group contents + */ + @Test + public void testMapVersionedProcessGroupsExcludingVersionedDescendants() { + // prepare a versioned process group with a nested versioned process group, each with 1 processor + final ProcessGroup innerProcessGroup = + prepareProcessGroup(1,false, false, false, + false, false,null, + false, true, Collections.emptyList()); + final ProcessGroup processGroup = + prepareProcessGroup(1,false,false, false, + false, false, null, + false, true, Lists.newArrayList(innerProcessGroup)); + + final List allProcessGroups = Lists.newArrayList(innerProcessGroup); + when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups); + + // perform the mapping, excluding descendant versioned flows + final InstantiatedVersionedProcessGroup versionedProcessGroup = + flowMapper.mapProcessGroup(processGroup, controllerServiceProvider, flowRegistryClient, + false); + final VersionedProcessGroup innerVersionedProcessGroup = + versionedProcessGroup.getProcessGroups().iterator().next(); + + // verify root versioned process group contents only + verifyVersionedProcessGroup(processGroup, versionedProcessGroup,false,false); + + // verify versioned descendant is present with VersionControlInfo only + verifyVersionedProcessGroup(innerProcessGroup, innerVersionedProcessGroup,true,false); + } + + /** + * Test mapping a versioned ProcessGroup model to a non-versioned VersionedProcessGroup. Version info is ignored. + * Most elements are exercised here... including labels, ports, processors, connections, funnels, nested process groups, + * remote process groups, congtroller services, external controller service references and variable registries. + */ + @Test + public void testMapNonVersionedProcessGroups() { + // create a controller service with a different process group id so it's treated as external + final ControllerServiceNode externalControllerServiceNode = prepareControllerService(UUID.randomUUID().toString()); + + // prepare a process group with nested process groups + final ProcessGroup innerInnerProcessGroup = + prepareProcessGroup(0,false, true, false, + true, false,null, + true, false, Collections.emptyList()); + final ProcessGroup innerProcessGroup = + prepareProcessGroup(1,true, false, false, + true, true, externalControllerServiceNode, + true, true, Lists.newArrayList(innerInnerProcessGroup)); + final ProcessGroup processGroup = + prepareProcessGroup(2,false,false, true, + false, true, null, + false, true, Lists.newArrayList(innerProcessGroup)); + + final List allProcessGroups = Lists.newArrayList(innerProcessGroup, innerInnerProcessGroup); + when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups); + + // perform the mapping + final InstantiatedVersionedProcessGroup versionedProcessGroup = + flowMapper.mapNonVersionedProcessGroup(processGroup, controllerServiceProvider); + + // recursively verify versioned process group contents + verifyVersionedProcessGroup(processGroup, versionedProcessGroup, false,true); + + // verify external controller service reference + final Map externalControllerServiceReferences = + versionedProcessGroup.getExternalControllerServiceReferences(); + final String expectedExternalControllerServiceReferenceKey = flowMapper.getGroupId(externalControllerServiceNode.getIdentifier()); + final ExternalControllerServiceReference externalControllerServiceReference = + externalControllerServiceReferences.get(expectedExternalControllerServiceReferenceKey); + assertNotNull(externalControllerServiceReference); + assertEquals(expectedExternalControllerServiceReferenceKey, externalControllerServiceReference.getIdentifier()); + assertEquals(externalControllerServiceNode.getName(), externalControllerServiceReference.getName()); + } + + private ProcessGroup prepareProcessGroupWithParameterContext(final List childProcessGroups, + final boolean includeParameterContext, + final boolean isVersionControlled) { + final ProcessGroup processGroup = mock(ProcessGroup.class); + + if (includeParameterContext) { + final ParameterContext parameterContext = mock(ParameterContext.class); + when(processGroup.getParameterContext()).thenReturn(parameterContext); + when(parameterContext.getName()).thenReturn("context" + (counter++)); + final Map parametersMap = Maps.newHashMap(); + when(parameterContext.getParameters()).thenReturn(parametersMap); + + addParameter(parametersMap, "value" + (counter++), false); + addParameter(parametersMap, "value" + (counter++), true); + addParameter(parametersMap, null, true); + } + + if (isVersionControlled) { + when(processGroup.getVersionControlInformation()).thenReturn(mock(VersionControlInformation.class)); + } + + when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups)); + + return processGroup; + } + + private void addParameter(final Map parametersMap, final String value, final boolean isSensitive) { + + final ParameterDescriptor parameterDescriptor = + new ParameterDescriptor.Builder().name("param" + (counter++)).description("description" + (counter++)).sensitive(isSensitive).build(); + final Parameter parameter = mock(Parameter.class); + when(parameter.getDescriptor()).thenReturn(parameterDescriptor); + when(parameter.getValue()).thenReturn(value); + parametersMap.put(parameterDescriptor, parameter); + } + + private void verifyParameterContext(final ParameterContext parameterContext, final VersionedParameterContext versionedParameterContext) { + assertEquals(parameterContext.getName(), versionedParameterContext.getName()); + + final Collection parameters = parameterContext.getParameters().values(); + final Set versionedParameters = versionedParameterContext.getParameters(); + // parameter order is not deterministic - use unique names to map up matching parameters + final Iterator parametersIterator = parameters.iterator(); + while (parametersIterator.hasNext()) { + final Parameter parameter = parametersIterator.next(); + final Iterator versionedParameterIterator = versionedParameters.iterator(); + while (versionedParameterIterator.hasNext()) { + final VersionedParameter versionedParameter = versionedParameterIterator.next(); + if (versionedParameter.getName().equals(parameter.getDescriptor().getName())) { + verifyParameter(versionedParameter, parameter); + versionedParameterIterator.remove(); + break; + } + } + } + assertTrue("Failed to match parameters by unique name", versionedParameters.isEmpty()); + + } + + private void verifyParameter(final VersionedParameter versionedParameter, final Parameter parameter) { + final ParameterDescriptor parameterDescriptor = parameter.getDescriptor(); + + assertEquals(parameterDescriptor.getName(), versionedParameter.getName()); + assertEquals(parameterDescriptor.getDescription(), versionedParameter.getDescription()); + assertEquals(parameterDescriptor.isSensitive(), versionedParameter.isSensitive()); + if (parameterDescriptor.isSensitive()) { + // verify parameter value is null for sensitive parameters + assertNull(versionedParameter.getValue()); + } else { + assertEquals(parameter.getValue(), versionedParameter.getValue()); + } + } + + private ProcessGroup prepareProcessGroup(final int numProcessors, final boolean includeFunnel,final boolean includePorts, + final boolean includeLabels, final boolean includeVariableRegistry, + final boolean includeControllerService, + final ControllerServiceNode externalControllerServiceNode, + final boolean includeRemoteProcessGroup, final boolean includeVersionControlInfo, + final List childProcessGroups) { + final String processGroupId = UUID.randomUUID().toString(); + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroup.getIdentifier()).thenReturn(processGroupId); + when(processGroup.getProcessGroupIdentifier()).thenReturn(processGroupId); + when(processGroup.getName()).thenReturn("group"+(counter++)); + when(processGroup.getComments()).thenReturn("comments"+(counter++)); + when(processGroup.getPosition()).thenReturn(new Position(counter++, counter++)); + final ParameterContext parameterContext = mock(ParameterContext.class); + when(processGroup.getParameterContext()).thenReturn(parameterContext); + when(parameterContext.getName()).thenReturn("context"+(counter++)); + + // prep funnels + final Set funnels = Sets.newHashSet(); + if (includeFunnel) { + funnels.add(prepareFunnel(processGroupId)); + } + when(processGroup.getFunnels()).thenReturn(funnels); + + // prep ports + final Set inputPorts = Sets.newHashSet(); + final Set outputPorts = Sets.newHashSet(); + if (includePorts) { + inputPorts.add(preparePort(processGroupId, PortType.INPUT_PORT)); + outputPorts.add(preparePort(processGroupId, PortType.OUTPUT_PORT)); + } + when(processGroup.getInputPorts()).thenReturn(inputPorts); + when(processGroup.getOutputPorts()).thenReturn(outputPorts); + + // prep labels + final Set