NIFI-6872:

- Added UI versioned flow supportsDownload functionality with download flow menu item
- Added VersionsResource endpoint for downloading versioned flow with registry-related info removed
- Added ProcessGroupResource endpoint for downloading current flow with registry-related info removed
- Added StandardNifiServiceFacade functionality for downloading both current and versioned flow
- Added XmlTransient markers on variables introduced by Instantiated model classes so they do not appear in serialized download
- Updated NiFiRegistryFlowMapper.mapParameterContexts to handle mapping nested parameter contexts for use in producing a complete VersionedFlowSnapshot
- Added ability for NiFiRegistryFlowMapper to map nested process groups ignoring versioning for use in producing a complete VersionedFlowSnapshot
- Added unit tests where helpful

NIFI-6872: PR response...
- Updated mapParameterContext to return a Map to handle uniqueness of contexts by name since ultimately everything converted it to a map anyway. The VersionedParameterContext class from the registry model doesn't support hashcode/equals currently so returning a Set wouldn't work.
- Updated assert calls to put expected value as first parameter and actual as second parameter
- Added one time password (OTP) support for flow download endpoint to support non cert based authentication

This closes #3931
This commit is contained in:
Joe Ferner 2019-11-19 10:15:42 -05:00 committed by Matt Gilman
parent f398299cfe
commit 058883091c
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
19 changed files with 1535 additions and 339 deletions

View File

@ -31,7 +31,7 @@ public class VersionedFlowEntity extends Entity {
return versionedFlow;
}
public void setVersionedFlow(VersionedFlowDTO versionedFLow) {
this.versionedFlow = versionedFLow;
public void setVersionedFlow(VersionedFlowDTO versionedFlow) {
this.versionedFlow = versionedFlow;
}
}

View File

@ -23,7 +23,6 @@ import org.apache.nifi.registry.client.NiFiRegistryException;
import java.io.IOException;
import java.util.Map;
import java.util.Collection;
import java.util.Set;
public interface FlowRegistry {
@ -134,7 +133,7 @@ public interface FlowRegistry {
* @param snapshot the snapshot of the flow
* @param externalControllerServices a mapping of of Controller Service identifier to ExternalControllerServiceReference for any Controller Service that is referenced by the flow but that are
* not included as part of the VersionedProcessGroup
* @param parameterContexts the Parameter Contexts to include in the snapshot
* @param parameterContexts a map of the Parameter Contexts to include in the snapshot keyed by name
* @param comments any comments for the snapshot
* @param expectedVersion the version of the flow that we expect to save this snapshot as
* @return the versioned flow snapshot
@ -144,7 +143,7 @@ public interface FlowRegistry {
* @throws NiFiRegistryException if the flow does not exist
*/
VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, Map<String, ExternalControllerServiceReference> externalControllerServices,
Collection<VersionedParameterContext> parameterContexts, String comments,
Map<String, VersionedParameterContext> parameterContexts, String comments,
int expectedVersion, NiFiUser user) throws IOException, NiFiRegistryException;
/**

View File

@ -29,14 +29,12 @@ import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class RestBasedFlowRegistry implements FlowRegistry {
private static final String FLOW_ENCODING_VERSION = "1.0";
public static final String FLOW_ENCODING_VERSION = "1.0";
private final FlowRegistryClient flowRegistryClient;
private final String identifier;
@ -179,17 +177,13 @@ public class RestBasedFlowRegistry implements FlowRegistry {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
final Map<String, ExternalControllerServiceReference> externalControllerServices,
Collection<VersionedParameterContext> parameterContexts, final String comments, final int expectedVersion,
final NiFiUser user) throws IOException, NiFiRegistryException {
final Map<String, VersionedParameterContext> parameterContextMap = parameterContexts.stream()
.collect(Collectors.toMap(VersionedParameterContext::getName, context -> context));
final Map<String, VersionedParameterContext> parameterContexts, final String comments,
final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user);
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setFlowContents(snapshot);
versionedFlowSnapshot.setExternalControllerServices(externalControllerServices);
versionedFlowSnapshot.setParameterContexts(parameterContextMap);
versionedFlowSnapshot.setParameterContexts(parameterContexts);
versionedFlowSnapshot.setFlowEncodingVersion(FLOW_ENCODING_VERSION);
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();

View File

@ -17,8 +17,14 @@
package org.apache.nifi.registry.flow.mapping;
import javax.xml.bind.annotation.XmlTransient;
public interface InstantiatedVersionedComponent {
// mark transient so fields are ignored when serializing all versioned component types
@XmlTransient
String getInstanceId();
@XmlTransient
String getInstanceGroupId();
}

View File

@ -20,9 +20,11 @@ package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import javax.xml.bind.annotation.XmlTransient;
import java.util.Map;
public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup implements InstantiatedVersionedComponent {
private final String instanceId;
private final String groupId;
private Map<String, ExternalControllerServiceReference> externalControllerServiceReferences;
@ -46,6 +48,8 @@ public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup imp
this.externalControllerServiceReferences = externalControllerServiceReferences;
}
// mark transient so field is ignored when serializing this class
@XmlTransient
public Map<String, ExternalControllerServiceReference> getExternalControllerServiceReferences() {
return externalControllerServiceReferences;
}

View File

@ -27,8 +27,8 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.service.ControllerServiceNode;
@ -80,6 +80,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -90,7 +91,7 @@ public class NiFiRegistryFlowMapper {
// We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
// we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
// identifier based on the comopnent's actual id. We do connections last, so that all components will already have been
// identifier based on the component's actual id. We do connections last, so that all components will already have been
// created before attempting to create the connection, where the ConnectableDTO is converted.
private Map<String, String> versionedComponentIds = new HashMap<>();
@ -98,14 +99,157 @@ public class NiFiRegistryFlowMapper {
this.extensionManager = extensionManager;
}
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient,
/**
* Map the given process group to a versioned process group without any use of an actual flow registry even if the
* group is currently versioned in a registry.
*
* @param group the process group to map
* @param serviceProvider the controller service provider to use for mapping
* @return a complete versioned process group without any registry related details
*/
public InstantiatedVersionedProcessGroup mapNonVersionedProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider) {
versionedComponentIds.clear();
// always include descendant flows and do not apply any registry versioning info that may be present in the group
return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> true);
}
/**
* Map the given process group to a versioned process group using the provided registry client.
*
* @param group the process group to map
* @param serviceProvider the controller service provider to use for mapping
* @param registryClient the registry client to use when retrieving versioning details
* @param mapDescendantVersionedFlows true in order to include descendant flows in the mapped result
* @return a complete versioned process group with applicable registry related details
*/
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider,
final FlowRegistryClient registryClient,
final boolean mapDescendantVersionedFlows) {
versionedComponentIds.clear();
final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows);
populateReferencedAncestorVariables(group, mapped);
// apply registry versioning according to the lambda below
// NOTE: lambda refers to registry client and map descendant boolean which will not change during recursion
return mapGroup(group, serviceProvider, (processGroup, versionedGroup) -> {
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
if (versionControlInfo != null) {
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
final String registryId = versionControlInfo.getRegistryIdentifier();
final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL.");
}
return mapped;
coordinates.setRegistryUrl(registry.getURL());
coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
coordinates.setVersion(versionControlInfo.getVersion());
versionedGroup.setVersionedFlowCoordinates(coordinates);
// We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports.
// Otherwise, we will not be able to lookup the port when connecting to it.
for (final Port port : processGroup.getInputPorts()) {
getId(port.getVersionedComponentId(), port.getIdentifier());
}
for (final Port port : processGroup.getOutputPorts()) {
getId(port.getVersionedComponentId(), port.getIdentifier());
}
// If the Process Group itself is remotely versioned, then we don't want to include its contents
// because the contents are remotely managed and not part of the versioning of this Process Group
return mapDescendantVersionedFlows;
}
return true;
});
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider,
final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo) {
final Set<String> allIncludedGroupsIds = group.findAllProcessGroups().stream()
.map(ProcessGroup::getIdentifier)
.collect(Collectors.toSet());
allIncludedGroupsIds.add(group.getIdentifier());
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = new HashMap<>();
final InstantiatedVersionedProcessGroup versionedGroup =
mapGroup(group, serviceProvider, applyVersionControlInfo, true, allIncludedGroupsIds, externalControllerServiceReferences);
populateReferencedAncestorVariables(group, versionedGroup);
return versionedGroup;
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider,
final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo,
final boolean topLevel, final Set<String> includedGroupIds,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
versionedGroup.setName(group.getName());
versionedGroup.setComments(group.getComments());
versionedGroup.setPosition(mapPosition(group.getPosition()));
final ParameterContext parameterContext = group.getParameterContext();
versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName());
// If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for,
// then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used
// only for a child group that is itself version controlled.
if (!topLevel) {
final boolean mapDescendantVersionedFlows = applyVersionControlInfo.apply(group, versionedGroup);
// return here if we do not want to include remotely managed descendant flows
if (!mapDescendantVersionedFlows) {
return versionedGroup;
}
}
versionedGroup.setControllerServices(group.getControllerServices(false).stream()
.map(service -> mapControllerService(service, serviceProvider, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setFunnels(group.getFunnels().stream()
.map(this::mapFunnel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setInputPorts(group.getInputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setOutputPorts(group.getOutputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setLabels(group.getLabels().stream()
.map(this::mapLabel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessors(group.getProcessors().stream()
.map(processor -> mapProcessor(processor, serviceProvider, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream()
.map(this::mapRemoteProcessGroup)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(group.getProcessGroups().stream()
.map(grp -> mapGroup(grp, serviceProvider, applyVersionControlInfo, false, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(group.getConnections().stream()
.map(this::mapConnection)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue)));
if (topLevel) {
versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences);
}
return versionedGroup;
}
private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
@ -138,121 +282,12 @@ public class NiFiRegistryFlowMapper {
}
group.getVariableRegistry().getVariableMap().keySet().stream()
.map(VariableDescriptor::getName)
.forEach(variableNames::add);
.map(VariableDescriptor::getName)
.forEach(variableNames::add);
populateVariableNames(group.getParent(), variableNames);
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient,
final boolean topLevel, final boolean mapDescendantVersionedFlows) {
final Set<String> allIncludedGroupsIds = group.findAllProcessGroups().stream()
.map(ProcessGroup::getIdentifier)
.collect(Collectors.toSet());
allIncludedGroupsIds.add(group.getIdentifier());
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = new HashMap<>();
return mapGroup(group, serviceProvider, registryClient, topLevel, mapDescendantVersionedFlows, allIncludedGroupsIds, externalControllerServiceReferences);
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient,
final boolean topLevel, final boolean mapDescendantVersionedFlows, final Set<String> includedGroupIds,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
versionedGroup.setName(group.getName());
versionedGroup.setComments(group.getComments());
versionedGroup.setPosition(mapPosition(group.getPosition()));
final ParameterContext parameterContext = group.getParameterContext();
versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName());
// If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for,
// then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used
// only for a child group that is itself version controlled.
if (!topLevel) {
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
final String registryId = versionControlInfo.getRegistryIdentifier();
final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL.");
}
coordinates.setRegistryUrl(registry.getURL());
coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
coordinates.setVersion(versionControlInfo.getVersion());
versionedGroup.setVersionedFlowCoordinates(coordinates);
// We need to register the Port ID -> Versioned Component ID's in our versionedComponentIds member variable for all input & output ports.
// Otherwise, we will not be able to lookup the port when connecting to it.
for (final Port port : group.getInputPorts()) {
getId(port.getVersionedComponentId(), port.getIdentifier());
}
for (final Port port : group.getOutputPorts()) {
getId(port.getVersionedComponentId(), port.getIdentifier());
}
// If the Process Group itself is remotely versioned, then we don't want to include its contents
// because the contents are remotely managed and not part of the versioning of this Process Group
if (!mapDescendantVersionedFlows) {
return versionedGroup;
}
}
}
versionedGroup.setControllerServices(group.getControllerServices(false).stream()
.map(service -> mapControllerService(service, serviceProvider, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setFunnels(group.getFunnels().stream()
.map(this::mapFunnel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setInputPorts(group.getInputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setOutputPorts(group.getOutputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setLabels(group.getLabels().stream()
.map(this::mapLabel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessors(group.getProcessors().stream()
.map(processor -> mapProcessor(processor, serviceProvider, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream()
.map(this::mapRemoteProcessGroup)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(group.getProcessGroups().stream()
.map(grp -> mapGroup(grp, serviceProvider, registryClient, false, mapDescendantVersionedFlows, includedGroupIds, externalControllerServiceReferences))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(group.getConnections().stream()
.map(this::mapConnection)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue)));
if (topLevel) {
versionedGroup.setExternalControllerServiceReferences(externalControllerServiceReferences);
}
return versionedGroup;
}
private String getId(final Optional<String> currentVersionedId, final String componentId) {
final String versionedId;
if (currentVersionedId.isPresent()) {
@ -279,7 +314,7 @@ public class NiFiRegistryFlowMapper {
}
private String getGroupId(final String groupId) {
public String getGroupId(final String groupId) {
return versionedComponentIds.get(groupId);
}
@ -599,23 +634,39 @@ public class NiFiRegistryFlowMapper {
return batchSize;
}
public VersionedParameterContext mapParameterContext(final ParameterContext context) {
if (context == null) {
return null;
}
final Set<VersionedParameter> parameters = context.getParameters().values().stream()
.map(this::mapParameter)
.collect(Collectors.toSet());
final VersionedParameterContext versionedContext = new VersionedParameterContext();
versionedContext.setName(context.getName());
versionedContext.setParameters(parameters);
return versionedContext;
public Map<String, VersionedParameterContext> mapParameterContexts(final ProcessGroup processGroup,
final boolean mapDescendantVersionedFlows) {
// cannot use a set to enforce uniqueness of parameter contexts because VersionedParameterContext in the
// registry data model doesn't currently implement hashcode/equals based on context name
final Map<String, VersionedParameterContext> parameterContexts = new HashMap<>();
mapParameterContexts(processGroup, mapDescendantVersionedFlows, parameterContexts);
return parameterContexts;
}
public VersionedParameter mapParameter(final Parameter parameter) {
private void mapParameterContexts(final ProcessGroup processGroup, final boolean mapDescendantVersionedFlows,
final Map<String, VersionedParameterContext> parameterContexts) {
final ParameterContext parameterContext = processGroup.getParameterContext();
if (parameterContext != null) {
// map this process group's parameter context and add to the collection
final Set<VersionedParameter> parameters = parameterContext.getParameters().values().stream()
.map(this::mapParameter)
.collect(Collectors.toSet());
final VersionedParameterContext versionedContext = new VersionedParameterContext();
versionedContext.setName(parameterContext.getName());
versionedContext.setParameters(parameters);
parameterContexts.put(versionedContext.getName(), versionedContext);
}
for (final ProcessGroup child : processGroup.getProcessGroups()) {
// only include child process group parameter contexts if boolean indicator is true or process group is unversioned
if (mapDescendantVersionedFlows || child.getVersionControlInformation() == null) {
mapParameterContexts(child, mapDescendantVersionedFlows, parameterContexts);
}
}
}
private VersionedParameter mapParameter(final Parameter parameter) {
if (parameter == null) {
return null;
}

View File

@ -29,7 +29,6 @@ import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -170,7 +169,7 @@ public class MockSingleFlowRegistryClient implements FlowRegistryClient {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot,
final Map<String, ExternalControllerServiceReference> externalControllerServices,
final Collection<VersionedParameterContext> parameterContexts, final String comments,
final Map<String, VersionedParameterContext> parameterContexts, final String comments,
final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException {
return null;
}

View File

@ -0,0 +1,780 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow.mapping;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class NiFiRegistryFlowMapperTest {
@Mock
private ExtensionManager extensionManager;
@Mock
private ControllerServiceProvider controllerServiceProvider;
@Mock
private FlowRegistryClient flowRegistryClient;
private NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(extensionManager);
private int counter = 1;
@Before
public void setup() {
final FlowRegistry flowRegistry = mock(FlowRegistry.class);
when(flowRegistryClient.getFlowRegistry(anyString())).thenReturn(flowRegistry);
when(flowRegistry.getURL()).thenReturn("url");
}
/**
* Test mapping versioned process group's parameter contexts excluding descendant versioned process groups
*/
@Test
public void testMapParameterContextsExcludingVersionedDescendants() {
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroupWithParameterContext(Collections.emptyList(),
true, true);
final ProcessGroup innerProcessGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
true, false);
final ProcessGroup processGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
false, false);
// first nesting should be traversed because child is not version controlled, but deeper nesting should be ignored
// because map versioned descendants indicator is false
final Map<String, VersionedParameterContext> versionedParameterContexts =
flowMapper.mapParameterContexts(processGroup, false);
// verify single parameter context
assertEquals(1, versionedParameterContexts.size());
final String expectedName = innerProcessGroup.getParameterContext().getName();
verifyParameterContext(innerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName));
}
/**
* Test mapping nested process group's parameter contexts
*/
@Test
public void testMapNestedParameterContexts() {
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroupWithParameterContext(Collections.emptyList(),
true, true);
final ProcessGroup innerProcessGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerInnerProcessGroup),
false, true);
final ProcessGroup processGroup =
prepareProcessGroupWithParameterContext(Lists.newArrayList(innerProcessGroup),
true, true);
// include nested parameter contexts even though they are version controlled because map descendant indicator is true
final Map<String, VersionedParameterContext> versionedParameterContexts =
flowMapper.mapParameterContexts(processGroup, true);
// verify parameter contexts
assertEquals(2, versionedParameterContexts.size());
final String expectedName1 = processGroup.getParameterContext().getName();
final String expectedName2 = innerInnerProcessGroup.getParameterContext().getName();
verifyParameterContext(processGroup.getParameterContext(), versionedParameterContexts.get(expectedName1));
verifyParameterContext(innerInnerProcessGroup.getParameterContext(), versionedParameterContexts.get(expectedName2));
}
/**
* Test mapping a versioned ProcessGroup model to a versioned VersionedProcessGroup excluding descendant versioned flows.
* VersionControlInformation should be mapped to the versioned inner process group instead of the group contents
*/
@Test
public void testMapVersionedProcessGroupsExcludingVersionedDescendants() {
// prepare a versioned process group with a nested versioned process group, each with 1 processor
final ProcessGroup innerProcessGroup =
prepareProcessGroup(1,false, false, false,
false, false,null,
false, true, Collections.emptyList());
final ProcessGroup processGroup =
prepareProcessGroup(1,false,false, false,
false, false, null,
false, true, Lists.newArrayList(innerProcessGroup));
final List<ProcessGroup> allProcessGroups = Lists.newArrayList(innerProcessGroup);
when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
// perform the mapping, excluding descendant versioned flows
final InstantiatedVersionedProcessGroup versionedProcessGroup =
flowMapper.mapProcessGroup(processGroup, controllerServiceProvider, flowRegistryClient,
false);
final VersionedProcessGroup innerVersionedProcessGroup =
versionedProcessGroup.getProcessGroups().iterator().next();
// verify root versioned process group contents only
verifyVersionedProcessGroup(processGroup, versionedProcessGroup,false,false);
// verify versioned descendant is present with VersionControlInfo only
verifyVersionedProcessGroup(innerProcessGroup, innerVersionedProcessGroup,true,false);
}
/**
* Test mapping a versioned ProcessGroup model to a non-versioned VersionedProcessGroup. Version info is ignored.
* Most elements are exercised here... including labels, ports, processors, connections, funnels, nested process groups,
* remote process groups, congtroller services, external controller service references and variable registries.
*/
@Test
public void testMapNonVersionedProcessGroups() {
// create a controller service with a different process group id so it's treated as external
final ControllerServiceNode externalControllerServiceNode = prepareControllerService(UUID.randomUUID().toString());
// prepare a process group with nested process groups
final ProcessGroup innerInnerProcessGroup =
prepareProcessGroup(0,false, true, false,
true, false,null,
true, false, Collections.emptyList());
final ProcessGroup innerProcessGroup =
prepareProcessGroup(1,true, false, false,
true, true, externalControllerServiceNode,
true, true, Lists.newArrayList(innerInnerProcessGroup));
final ProcessGroup processGroup =
prepareProcessGroup(2,false,false, true,
false, true, null,
false, true, Lists.newArrayList(innerProcessGroup));
final List<ProcessGroup> allProcessGroups = Lists.newArrayList(innerProcessGroup, innerInnerProcessGroup);
when(processGroup.findAllProcessGroups()).thenReturn(allProcessGroups);
// perform the mapping
final InstantiatedVersionedProcessGroup versionedProcessGroup =
flowMapper.mapNonVersionedProcessGroup(processGroup, controllerServiceProvider);
// recursively verify versioned process group contents
verifyVersionedProcessGroup(processGroup, versionedProcessGroup, false,true);
// verify external controller service reference
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences =
versionedProcessGroup.getExternalControllerServiceReferences();
final String expectedExternalControllerServiceReferenceKey = flowMapper.getGroupId(externalControllerServiceNode.getIdentifier());
final ExternalControllerServiceReference externalControllerServiceReference =
externalControllerServiceReferences.get(expectedExternalControllerServiceReferenceKey);
assertNotNull(externalControllerServiceReference);
assertEquals(expectedExternalControllerServiceReferenceKey, externalControllerServiceReference.getIdentifier());
assertEquals(externalControllerServiceNode.getName(), externalControllerServiceReference.getName());
}
private ProcessGroup prepareProcessGroupWithParameterContext(final List<ProcessGroup> childProcessGroups,
final boolean includeParameterContext,
final boolean isVersionControlled) {
final ProcessGroup processGroup = mock(ProcessGroup.class);
if (includeParameterContext) {
final ParameterContext parameterContext = mock(ParameterContext.class);
when(processGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getName()).thenReturn("context" + (counter++));
final Map<ParameterDescriptor, Parameter> parametersMap = Maps.newHashMap();
when(parameterContext.getParameters()).thenReturn(parametersMap);
addParameter(parametersMap, "value" + (counter++), false);
addParameter(parametersMap, "value" + (counter++), true);
addParameter(parametersMap, null, true);
}
if (isVersionControlled) {
when(processGroup.getVersionControlInformation()).thenReturn(mock(VersionControlInformation.class));
}
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
return processGroup;
}
private void addParameter(final Map<ParameterDescriptor, Parameter> parametersMap, final String value, final boolean isSensitive) {
final ParameterDescriptor parameterDescriptor =
new ParameterDescriptor.Builder().name("param" + (counter++)).description("description" + (counter++)).sensitive(isSensitive).build();
final Parameter parameter = mock(Parameter.class);
when(parameter.getDescriptor()).thenReturn(parameterDescriptor);
when(parameter.getValue()).thenReturn(value);
parametersMap.put(parameterDescriptor, parameter);
}
private void verifyParameterContext(final ParameterContext parameterContext, final VersionedParameterContext versionedParameterContext) {
assertEquals(parameterContext.getName(), versionedParameterContext.getName());
final Collection<Parameter> parameters = parameterContext.getParameters().values();
final Set<VersionedParameter> versionedParameters = versionedParameterContext.getParameters();
// parameter order is not deterministic - use unique names to map up matching parameters
final Iterator<Parameter> parametersIterator = parameters.iterator();
while (parametersIterator.hasNext()) {
final Parameter parameter = parametersIterator.next();
final Iterator<VersionedParameter> versionedParameterIterator = versionedParameters.iterator();
while (versionedParameterIterator.hasNext()) {
final VersionedParameter versionedParameter = versionedParameterIterator.next();
if (versionedParameter.getName().equals(parameter.getDescriptor().getName())) {
verifyParameter(versionedParameter, parameter);
versionedParameterIterator.remove();
break;
}
}
}
assertTrue("Failed to match parameters by unique name", versionedParameters.isEmpty());
}
private void verifyParameter(final VersionedParameter versionedParameter, final Parameter parameter) {
final ParameterDescriptor parameterDescriptor = parameter.getDescriptor();
assertEquals(parameterDescriptor.getName(), versionedParameter.getName());
assertEquals(parameterDescriptor.getDescription(), versionedParameter.getDescription());
assertEquals(parameterDescriptor.isSensitive(), versionedParameter.isSensitive());
if (parameterDescriptor.isSensitive()) {
// verify parameter value is null for sensitive parameters
assertNull(versionedParameter.getValue());
} else {
assertEquals(parameter.getValue(), versionedParameter.getValue());
}
}
private ProcessGroup prepareProcessGroup(final int numProcessors, final boolean includeFunnel,final boolean includePorts,
final boolean includeLabels, final boolean includeVariableRegistry,
final boolean includeControllerService,
final ControllerServiceNode externalControllerServiceNode,
final boolean includeRemoteProcessGroup, final boolean includeVersionControlInfo,
final List<ProcessGroup> childProcessGroups) {
final String processGroupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn(processGroupId);
when(processGroup.getProcessGroupIdentifier()).thenReturn(processGroupId);
when(processGroup.getName()).thenReturn("group"+(counter++));
when(processGroup.getComments()).thenReturn("comments"+(counter++));
when(processGroup.getPosition()).thenReturn(new Position(counter++, counter++));
final ParameterContext parameterContext = mock(ParameterContext.class);
when(processGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getName()).thenReturn("context"+(counter++));
// prep funnels
final Set<Funnel> funnels = Sets.newHashSet();
if (includeFunnel) {
funnels.add(prepareFunnel(processGroupId));
}
when(processGroup.getFunnels()).thenReturn(funnels);
// prep ports
final Set<Port> inputPorts = Sets.newHashSet();
final Set<Port> outputPorts = Sets.newHashSet();
if (includePorts) {
inputPorts.add(preparePort(processGroupId, PortType.INPUT_PORT));
outputPorts.add(preparePort(processGroupId, PortType.OUTPUT_PORT));
}
when(processGroup.getInputPorts()).thenReturn(inputPorts);
when(processGroup.getOutputPorts()).thenReturn(outputPorts);
// prep labels
final Set<Label> labels = Sets.newHashSet();
if (includeLabels) {
labels.add(prepareLabel(processGroupId));
}
when(processGroup.getLabels()).thenReturn(labels);
// prep connections and processors
final Set<ProcessorNode> processorNodes = Sets.newLinkedHashSet();
final Set<Connection> connections = Sets.newHashSet();
if (numProcessors == 2) {
// 2 processors connected together
final ProcessorNode processorNode1 = prepareProcessor(processGroup, externalControllerServiceNode);
final ProcessorNode processorNode2 = prepareProcessor(processGroup, externalControllerServiceNode);
processorNodes.add(processorNode1);
processorNodes.add(processorNode2);
connections.add(prepareConnection(processorNode1, processorNode2, processGroup));
} else if (numProcessors == 1) {
// a processor connected to itself
final ProcessorNode processorNode1 = prepareProcessor(processGroup, externalControllerServiceNode);
processorNodes.add(processorNode1);
connections.add(prepareConnection(processorNode1, processorNode1, processGroup));
}
when(processGroup.getProcessors()).thenReturn(processorNodes);
when(processGroup.getConnections()).thenReturn(connections);
// prep controller services
final Set<ControllerServiceNode> controllerServiceNodes = Sets.newHashSet();
if (includeControllerService) {
controllerServiceNodes.add(prepareControllerService(processGroupId));
}
when(processGroup.getControllerServices(false)).thenReturn(controllerServiceNodes);
// prep variable registry
final ComponentVariableRegistry componentVariableRegistry = mock(ComponentVariableRegistry.class);
when(processGroup.getVariableRegistry()).thenReturn(componentVariableRegistry);
final Map<VariableDescriptor, String> registryVariableMap = Maps.newHashMap();
if (includeVariableRegistry) {
registryVariableMap.putAll(prepareVariableRegistry());
}
when(componentVariableRegistry.getVariableMap()).thenReturn(registryVariableMap);
// prepare remote process group
final Set<RemoteProcessGroup> remoteProcessGroups = Sets.newHashSet();
if (includeRemoteProcessGroup) {
remoteProcessGroups.add(prepareRemoteProcessGroup(processGroupId));
}
when(processGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
// prep version control info
if (includeVersionControlInfo) {
final VersionControlInformation versionControlInformation = prepareVersionControlInfo();
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
}
// prep nested process groups
when(processGroup.getProcessGroups()).thenReturn(Sets.newLinkedHashSet(childProcessGroups));
return processGroup;
}
private Funnel prepareFunnel(final String processGroupId) {
final Funnel funnel = mock(Funnel.class);
prepareComponentAuthorizable(funnel, processGroupId);
preparePositionable(funnel);
return funnel;
}
private Port preparePort(final String processGroupId, final PortType portType) {
final Port port = mock(Port.class);
prepareComponentAuthorizable(port, processGroupId);
preparePositionable(port);
prepareConnectable(port, ConnectableType.valueOf(portType.name()));
return port;
}
private Label prepareLabel(final String processGroupId) {
final Label label = mock(Label.class);
prepareComponentAuthorizable(label, processGroupId);
preparePositionable(label);
when(label.getSize()).thenReturn(new Size(counter++, counter++));
return label;
}
private ProcessorNode prepareProcessor(final ProcessGroup processGroup, final ControllerServiceNode externalControllerServiceNode) {
final ProcessorNode processorNode = mock(ProcessorNode.class);
prepareComponentAuthorizable(processorNode, processGroup.getIdentifier());
preparePositionable(processorNode);
prepareConnectable(processorNode, ConnectableType.PROCESSOR);
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processorNode.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet());
when(processorNode.getBulletinLevel()).thenReturn(LogLevel.INFO);
when(processorNode.getExecutionNode()).thenReturn(ExecutionNode.ALL);
when(processorNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
when(processorNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
final String rawPropertyValue = "propValue";
final PropertyDescriptor.Builder propertyDescriptorBuilder =
new PropertyDescriptor.Builder().name("propName").sensitive(false).displayName("displayName");
if (externalControllerServiceNode != null) {
propertyDescriptorBuilder.identifiesControllerService(ControllerService.class);
when(controllerServiceProvider.getControllerServiceNode(rawPropertyValue)).thenReturn(externalControllerServiceNode);
}
final PropertyDescriptor propertyDescriptor = propertyDescriptorBuilder.build();
final PropertyConfiguration propertyConfiguration = mock(PropertyConfiguration.class);
final Map<PropertyDescriptor, PropertyConfiguration> properties = Maps.newHashMap();
properties.put(propertyDescriptor, propertyConfiguration);
when(processorNode.getProperties()).thenReturn(properties);
when(processorNode.getProperty(propertyDescriptor)).thenReturn(propertyConfiguration);
when(propertyConfiguration.getRawValue()).thenReturn(rawPropertyValue);
return processorNode;
}
private Connection prepareConnection(final ProcessorNode sourceProcessorNode, final ProcessorNode destProcessorNode,
final ProcessGroup processGroup) {
final Connection connection = mock(Connection.class);
when(connection.getIdentifier()).thenReturn(UUID.randomUUID().toString());
when(connection.getProcessGroup()).thenReturn(processGroup);
when(connection.getBendPoints()).thenReturn(Lists.newArrayList(new Position(counter++, counter++)));
when(connection.getRelationships()).thenReturn(Lists.newArrayList());
final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(flowFileQueue.getPriorities()).thenReturn(Collections.emptyList());
when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE);
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
when(sourceProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connection.getSource()).thenReturn(sourceProcessorNode);
when(destProcessorNode.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connection.getDestination()).thenReturn(destProcessorNode);
return connection;
}
private Map<VariableDescriptor, String> prepareVariableRegistry() {
final VariableDescriptor variableDescriptor =
new VariableDescriptor.Builder("variable"+(counter++)).build();
final Map<VariableDescriptor, String> variableRegistryMap = Maps.newHashMap();
variableRegistryMap.put(variableDescriptor, "value"+(counter++));
return variableRegistryMap;
}
private ControllerServiceNode prepareControllerService(final String processGroupId) {
final ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
prepareComponentAuthorizable(controllerServiceNode, processGroupId);
when(controllerServiceNode.getName()).thenReturn("service" + (counter++));
when(controllerServiceNode.getBundleCoordinate()).thenReturn(mock(BundleCoordinate.class));
when(controllerServiceNode.getControllerServiceImplementation()).thenReturn(mock(ControllerService.class));
when(controllerServiceNode.getProperties()).thenReturn(Collections.emptyMap());
return controllerServiceNode;
}
private RemoteProcessGroup prepareRemoteProcessGroup(final String processGroupId) {
final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
prepareComponentAuthorizable(remoteProcessGroup, processGroupId);
preparePositionable(remoteProcessGroup);
when(remoteProcessGroup.getName()).thenReturn("remote" + (counter++));
when(remoteProcessGroup.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
final RemoteGroupPort remoteGroupInputPort = prepareRemoteGroupPort(remoteProcessGroup);
when(remoteProcessGroup.getInputPorts()).thenReturn(Sets.newHashSet(remoteGroupInputPort));
final RemoteGroupPort remoteGroupOutputPort = prepareRemoteGroupPort(remoteProcessGroup);
when(remoteProcessGroup.getOutputPorts()).thenReturn(Sets.newHashSet(remoteGroupOutputPort));
return remoteProcessGroup;
}
private RemoteGroupPort prepareRemoteGroupPort(final RemoteProcessGroup remoteProcessGroup) {
final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
prepareComponentAuthorizable(remoteGroupPort, remoteProcessGroup.getIdentifier());
when(remoteGroupPort.getName()).thenReturn("remotePort" + (counter++));
when(remoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
return remoteGroupPort;
}
private VersionControlInformation prepareVersionControlInfo() {
final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class);
when(versionControlInformation.getRegistryIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getBucketIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getFlowIdentifier()).thenReturn(UUID.randomUUID().toString());
when(versionControlInformation.getVersion()).thenReturn(counter++);
return versionControlInformation;
}
private void prepareComponentAuthorizable(final ComponentAuthorizable authorizable, final String processGroupId) {
when(authorizable.getIdentifier()).thenReturn(UUID.randomUUID().toString());
when(authorizable.getProcessGroupIdentifier()).thenReturn(processGroupId);
}
private void preparePositionable(final Positionable positionable) {
when(positionable.getPosition()).thenReturn(new Position(counter++, counter++));
}
private void prepareConnectable(final Connectable connectable, final ConnectableType connectableType) {
when(connectable.getName()).thenReturn("connectable" + (counter++));
when(connectable.getConnectableType()).thenReturn(connectableType);
}
/**
* Verify the given VersionedProcessGroup was correctly mapped from the given ProcessGroup, including child groups
* when boolean indicator is true. If expectVersionControlInfo boolean indicator is true, the mapped group should
* only contain VersionControlInfo, otherwise it should instead contain its full contents.
*/
private void verifyVersionedProcessGroup(final ProcessGroup processGroup, final VersionedProcessGroup versionedProcessGroup,
final boolean expectVersionControlInfo, final boolean verifyChildProcessGroups) {
final String expectedGroupIdentifier = flowMapper.getGroupId(processGroup.getProcessGroupIdentifier());
// verify process group fields
assertEquals(processGroup.getName(), versionedProcessGroup.getName());
assertEquals(flowMapper.getGroupId(processGroup.getIdentifier()), versionedProcessGroup.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedProcessGroup.getGroupIdentifier());
assertEquals(processGroup.getComments(), versionedProcessGroup.getComments());
assertEquals(processGroup.getPosition().getX(), versionedProcessGroup.getPosition().getX(), 0);
assertEquals(processGroup.getPosition().getY(), versionedProcessGroup.getPosition().getY(), 0);
final String expectedParameterContextName =
(processGroup.getParameterContext() != null ? processGroup.getParameterContext().getName() : null);
assertEquals(expectedParameterContextName, versionedProcessGroup.getParameterContextName());
// verify either version control info or full group contents
if (expectVersionControlInfo) {
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
final VersionedFlowCoordinates versionedFlowCoordinates = versionedProcessGroup.getVersionedFlowCoordinates();
assertNotNull(versionedFlowCoordinates.getRegistryUrl());
assertEquals(versionControlInfo.getBucketIdentifier(), versionedFlowCoordinates.getBucketId());
assertEquals(versionControlInfo.getFlowIdentifier(), versionedFlowCoordinates.getFlowId());
assertEquals(versionControlInfo.getVersion(), versionedFlowCoordinates.getVersion());
} else {
assertNull(versionedProcessGroup.getVersionedFlowCoordinates());
// verify funnels
final Set<Funnel> funnels = processGroup.getFunnels();
final Set<VersionedFunnel> versionedFunnels = versionedProcessGroup.getFunnels();
if (funnels.isEmpty()) {
assertTrue(versionedFunnels.isEmpty());
} else {
final Funnel funnel = funnels.iterator().next();
final VersionedFunnel versionedFunnel = versionedFunnels.iterator().next();
assertEquals(flowMapper.getGroupId(funnel.getIdentifier()), versionedFunnel.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedFunnel.getGroupIdentifier());
assertEquals(funnel.getPosition().getX(), versionedFunnel.getPosition().getX(), 0);
assertEquals(funnel.getPosition().getY(), versionedFunnel.getPosition().getY(), 0);
}
// verify ports
verifyPorts(processGroup.getInputPorts(), versionedProcessGroup.getInputPorts(), PortType.INPUT_PORT, expectedGroupIdentifier);
verifyPorts(processGroup.getOutputPorts(), versionedProcessGroup.getOutputPorts(), PortType.OUTPUT_PORT, expectedGroupIdentifier);
// verify labels
final Set<Label> labels = processGroup.getLabels();
final Set<VersionedLabel> versionedLabels = versionedProcessGroup.getLabels();
if (labels.isEmpty()) {
assertTrue(versionedLabels.isEmpty());
} else {
final Label label = labels.iterator().next();
final VersionedLabel versionedLabel = versionedLabels.iterator().next();
assertEquals(flowMapper.getGroupId(label.getIdentifier()), versionedLabel.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedLabel.getGroupIdentifier());
assertEquals(label.getPosition().getX(), versionedLabel.getPosition().getX(), 0);
assertEquals(label.getPosition().getY(), versionedLabel.getPosition().getY(), 0);
assertEquals(label.getSize().getHeight(), versionedLabel.getHeight(), 0);
assertEquals(label.getSize().getWidth(), versionedLabel.getWidth(), 0);
}
// verify processors
final Collection<ProcessorNode> processorNodes = processGroup.getProcessors();
final Set<VersionedProcessor> versionedProcessors = versionedProcessGroup.getProcessors();
// first verify the number of processors matches
assertEquals(processorNodes.size(), versionedProcessors.size());
// processor order is not deterministic - use unique names to map up matching processors
final Iterator<ProcessorNode> processorNodesIterator = processorNodes.iterator();
while (processorNodesIterator.hasNext()) {
final ProcessorNode processorNode = processorNodesIterator.next();
final Iterator<VersionedProcessor> versionedProcessorIterator = versionedProcessors.iterator();
while (versionedProcessorIterator.hasNext()) {
final VersionedProcessor versionedProcessor = versionedProcessorIterator.next();
if (versionedProcessor.getName().equals(processorNode.getName())) {
verifyProcessor(versionedProcessor, processorNode, expectedGroupIdentifier);
versionedProcessorIterator.remove();
break;
}
}
}
assertTrue("Failed to match processors by unique name", versionedProcessors.isEmpty());
// verify connections
final Set<Connection> connections = processGroup.getConnections();
final Set<VersionedConnection> versionedConnections = versionedProcessGroup.getConnections();
if (connections.isEmpty()) {
assertTrue(versionedConnections.isEmpty());
} else {
final Connection connection = connections.iterator().next();
final VersionedConnection versionedConnection = versionedConnections.iterator().next();
assertEquals(connection.getName(), versionedConnection.getName());
assertEquals(flowMapper.getGroupId(connection.getIdentifier()), versionedConnection.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedConnection.getGroupIdentifier());
assertEquals(connection.getBendPoints().get(0).getX(), versionedConnection.getBends().get(0).getX(), 0);
assertEquals(connection.getBendPoints().get(0).getY(), versionedConnection.getBends().get(0).getY(), 0);
assertEquals(flowMapper.getGroupId(connection.getSource().getIdentifier()), versionedConnection.getSource().getId());
assertEquals(expectedGroupIdentifier, versionedConnection.getSource().getGroupId());
assertEquals(connection.getSource().getName(), versionedConnection.getSource().getName());
assertEquals(connection.getSource().getConnectableType().name(), versionedConnection.getSource().getType().name());
assertEquals(flowMapper.getGroupId(connection.getDestination().getIdentifier()), versionedConnection.getDestination().getId());
assertEquals(expectedGroupIdentifier, versionedConnection.getDestination().getGroupId());
assertEquals(connection.getDestination().getName(), versionedConnection.getDestination().getName());
assertEquals(connection.getDestination().getConnectableType().name(), versionedConnection.getDestination().getType().name());
}
// verify controller services
final Set<ControllerServiceNode> controllerServiceNodes = processGroup.getControllerServices(false);
final Set<VersionedControllerService> versionedControllerServices = versionedProcessGroup.getControllerServices();
if (controllerServiceNodes.isEmpty()) {
assertTrue(versionedControllerServices.isEmpty());
} else {
final ControllerServiceNode controllerServiceNode = controllerServiceNodes.iterator().next();
final VersionedControllerService versionedControllerService = versionedControllerServices.iterator().next();
assertEquals(controllerServiceNode.getName(), versionedControllerService.getName());
assertEquals(flowMapper.getGroupId(controllerServiceNode.getIdentifier()), versionedControllerService.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedControllerService.getGroupIdentifier());
}
// verify variables
final Map<VariableDescriptor, String> variableRegistryMap = processGroup.getVariableRegistry().getVariableMap();
final Map<String, String> versionedVariableMap = versionedProcessGroup.getVariables();
if (variableRegistryMap.isEmpty()) {
assertTrue(versionedVariableMap.isEmpty());
} else {
final VariableDescriptor variableRegistryKey = variableRegistryMap.keySet().iterator().next();
assertEquals(variableRegistryMap.get(variableRegistryKey), versionedVariableMap.get(variableRegistryKey.getName()));
}
// verify remote process group(s)
final Set<RemoteProcessGroup> remoteProcessGroups = processGroup.getRemoteProcessGroups();
final Set<VersionedRemoteProcessGroup> versionedRemoteProcessGroups = versionedProcessGroup.getRemoteProcessGroups();
if (remoteProcessGroups.isEmpty()) {
assertTrue(versionedRemoteProcessGroups.isEmpty());
} else {
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroups.iterator().next();
final VersionedRemoteProcessGroup versionedRemoteProcessGroup = versionedRemoteProcessGroups.iterator().next();
assertEquals(remoteProcessGroup.getName(), versionedRemoteProcessGroup.getName());
assertEquals(flowMapper.getGroupId(remoteProcessGroup.getIdentifier()), versionedRemoteProcessGroup.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedRemoteProcessGroup.getGroupIdentifier());
assertEquals(remoteProcessGroup.getPosition().getX(), versionedRemoteProcessGroup.getPosition().getX(), 0);
assertEquals(remoteProcessGroup.getPosition().getY(), versionedRemoteProcessGroup.getPosition().getY(), 0);
// verify remote ports
final String expectedPortGroupIdentifier = flowMapper.getGroupId(remoteProcessGroup.getIdentifier());
verifyRemotePorts(remoteProcessGroup.getInputPorts(), versionedRemoteProcessGroup.getInputPorts(),
ComponentType.REMOTE_INPUT_PORT, expectedPortGroupIdentifier);
verifyRemotePorts(remoteProcessGroup.getOutputPorts(), versionedRemoteProcessGroup.getOutputPorts(),
ComponentType.REMOTE_OUTPUT_PORT, expectedPortGroupIdentifier);
}
if (verifyChildProcessGroups) {
// recurse to verify inner process group(s)
final Set<ProcessGroup> innerProcessGroups = processGroup.getProcessGroups();
final Set<VersionedProcessGroup> innerVersionedProcessGroups = versionedProcessGroup.getProcessGroups();
if (innerProcessGroups.isEmpty()) {
assertTrue(innerVersionedProcessGroups.isEmpty());
} else {
final ProcessGroup innerProcessGroup = innerProcessGroups.iterator().next();
final VersionedProcessGroup innerVersionedProcessGroup = innerVersionedProcessGroups.iterator().next();
verifyVersionedProcessGroup(innerProcessGroup, innerVersionedProcessGroup, expectVersionControlInfo,
verifyChildProcessGroups);
}
}
}
}
private void verifyPorts(final Set<Port> ports, final Set<VersionedPort> versionedPorts, final PortType portType,
final String expectedGroupIdentifier) {
if (ports.isEmpty()) {
assertTrue(versionedPorts.isEmpty());
} else {
final Port port = ports.iterator().next();
final VersionedPort versionedPort = versionedPorts.iterator().next();
assertEquals(flowMapper.getGroupId(port.getIdentifier()), versionedPort.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedPort.getGroupIdentifier());
assertEquals(port.getPosition().getX(), versionedPort.getPosition().getX(), 0);
assertEquals(port.getPosition().getY(), versionedPort.getPosition().getY(), 0);
assertEquals(port.getName(), versionedPort.getName());
assertEquals(portType, versionedPort.getType());
}
}
private void verifyRemotePorts(final Set<RemoteGroupPort> remotePorts,
final Set<VersionedRemoteGroupPort> versionedRemotePorts,
final ComponentType componentType, final String expectedPortGroupIdentifier) {
if (remotePorts.isEmpty()) {
assertTrue(versionedRemotePorts.isEmpty());
} else {
final RemoteGroupPort remotePort = remotePorts.iterator().next();
final VersionedRemoteGroupPort versionedRemotePort = versionedRemotePorts.iterator().next();
assertEquals(flowMapper.getGroupId(remotePort.getIdentifier()), versionedRemotePort.getIdentifier());
assertEquals(expectedPortGroupIdentifier, versionedRemotePort.getGroupIdentifier());
assertEquals(remotePort.getName(), versionedRemotePort.getName());
assertEquals(componentType, versionedRemotePort.getComponentType());
}
}
private void verifyProcessor(final VersionedProcessor versionedProcessor, final ProcessorNode processorNode,
final String expectedGroupIdentifier) {
assertEquals(processorNode.getName(), versionedProcessor.getName());
assertEquals(flowMapper.getGroupId(processorNode.getIdentifier()), versionedProcessor.getIdentifier());
assertEquals(expectedGroupIdentifier, versionedProcessor.getGroupIdentifier());
assertEquals(processorNode.getPosition().getX(), versionedProcessor.getPosition().getX(), 0);
assertEquals(processorNode.getPosition().getY(), versionedProcessor.getPosition().getY(), 0);
final PropertyDescriptor propertyDescriptor = processorNode.getProperties().keySet().iterator().next();
final VersionedPropertyDescriptor versionedPropertyDescriptor =
versionedProcessor.getPropertyDescriptors().get(propertyDescriptor.getName());
assertTrue(versionedProcessor.getProperties().containsKey(propertyDescriptor.getName()));
assertNotNull(versionedPropertyDescriptor);
assertEquals(propertyDescriptor.getName(), versionedPropertyDescriptor.getName());
assertEquals(propertyDescriptor.getDisplayName(), versionedPropertyDescriptor.getDisplayName());
assertEquals(propertyDescriptor.isSensitive(), versionedPropertyDescriptor.isSensitive());
}
}

View File

@ -1457,15 +1457,17 @@ public interface NiFiServiceFacade {
* @param snapshot the Snapshot to persist
* @param externalControllerServiceReferences a mapping of controller service id to ExternalControllerServiceReference for any Controller Service that is referenced in the flow but not included
* in the VersionedProcessGroup
* @oaram ParameterContext the parameter contexts to include
* @param parameterContexts a map of the Parameter Contexts to include keyed by name
* @param comments about the snapshot
* @param expectedVersion the version to save the flow as
* @return the snapshot that represents what was stored in the registry
*
* @throws NiFiCoreException if unable to register the snapshot with the flow registry
*/
VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, Collection<VersionedParameterContext> parameterContexts,
Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, String comments, int expectedVersion);
VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot,
Map<String, VersionedParameterContext> parameterContexts,
Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
String comments, int expectedVersion);
/**
* Updates the Version Control Information on the Process Group with the given ID
@ -1501,6 +1503,24 @@ public interface NiFiServiceFacade {
*/
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows);
/**
* Get the latest Versioned Flow Snapshot from the registry for the Process Group with the given ID
*
* @param processGroupId the ID of the Process Group
* @return the latest Versioned Flow Snapshot for download
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
VersionedFlowSnapshot getVersionedFlowSnapshotByGroupId(String processGroupId);
/**
* Get the current state of the Process Group with the given ID, converted to a Versioned Flow Snapshot
*
* @param processGroupId the ID of the Process Group
* @return the current Process Group converted to a Versioned Flow Snapshot for download
*/
VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(String processGroupId);
/**
* Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
* the ID itself as the name

View File

@ -107,6 +107,7 @@ import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConfigurableComponent;
@ -2511,7 +2512,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// import the template
final Template template = templateDAO.importTemplate(templateDTO, groupId);
// save the flow
controllerFacade.save();
// return the template dto
@ -4304,7 +4305,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// Create a VersionedProcessGroup snapshot of the flow as it is currently.
final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
final Collection<VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup);
final Map<String, VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup);
final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
@ -4395,6 +4396,76 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entity;
}
@Override
public VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
// Create a complete (include descendant flows) VersionedProcessGroup snapshot of the flow as it is
// currently without any registry related fields populated, even if the flow is currently versioned.
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
mapper.mapNonVersionedProcessGroup(processGroup, controllerFacade.getControllerServiceProvider());
// Create a complete (include descendant flows) map of parameter contexts
final Map<String, VersionedParameterContext> parameterContexts =
mapper.mapParameterContexts(processGroup, true);
final VersionedFlowSnapshot nonVersionedFlowSnapshot = new VersionedFlowSnapshot();
nonVersionedFlowSnapshot.setFlowContents(nonVersionedProcessGroup);
nonVersionedFlowSnapshot.setExternalControllerServices(nonVersionedProcessGroup.getExternalControllerServiceReferences());
nonVersionedFlowSnapshot.setParameterContexts(parameterContexts);
nonVersionedFlowSnapshot.setFlowEncodingVersion(RestBasedFlowRegistry.FLOW_ENCODING_VERSION);
return nonVersionedFlowSnapshot;
}
@Override
public VersionedFlowSnapshot getVersionedFlowSnapshotByGroupId(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
return getVersionedFlowSnapshot(versionControlInfo.getRegistryIdentifier(), versionControlInfo.getBucketIdentifier(),
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true);
}
@Override
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
return getVersionedFlowSnapshot(versionControlInfo.getRegistryId(), versionControlInfo.getBucketId(), versionControlInfo.getFlowId(),
versionControlInfo.getVersion(), fetchRemoteFlows);
}
/**
*
* @param registryId the id of the registry to retrieve the versioned flow from
* @param bucketId the id of the bucket within the registry
* @param flowId the id of the flow within the bucket/registry
* @param flowVersion the version of the flow to retrieve
* @param fetchRemoteFlows indicator to include remote flows when retrieving the flow
* @return a VersionedFlowSnapshot from a registry with the given version
*/
private VersionedFlowSnapshot getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId,
final Integer flowVersion, final boolean fetchRemoteFlows) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
}
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(bucketId, flowId, flowVersion, fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException e) {
logger.error(e.getMessage(), e);
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
+ bucketId + ", Flow " + flowId + ", Version " + flowVersion);
} catch (final IOException ioe) {
throw new IllegalStateException(
"Failed to communicate with Flow Registry when attempting to retrieve a versioned flow");
}
return snapshot;
}
@Override
public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
@ -4424,30 +4495,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
return versionedGroup;
}
private Collection<VersionedParameterContext> createVersionedParameterContexts(final ProcessGroup processGroup) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final Collection<VersionedParameterContext> parameterContexts = new ArrayList<>();
createVersionedParameterContexts(processGroup, mapper, parameterContexts);
return parameterContexts;
}
private void createVersionedParameterContexts(final ProcessGroup processGroup, final NiFiRegistryFlowMapper mapper, final Collection<VersionedParameterContext> contextCollection) {
final ParameterContext parameterContext = processGroup.getParameterContext();
if (parameterContext != null) {
final VersionedParameterContext versionedContext = mapper.mapParameterContext(processGroup.getParameterContext());
contextCollection.add(versionedContext);
}
for (final ProcessGroup child : processGroup.getProcessGroups()) {
if (child.getVersionControlInformation() == null) {
createVersionedParameterContexts(child, mapper, contextCollection);
}
}
private Map<String, VersionedParameterContext> createVersionedParameterContexts(final ProcessGroup processGroup) {
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
return mapper.mapParameterContexts(processGroup, false);
}
@Override
@ -4472,7 +4527,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
@ -4540,7 +4595,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, final VersionedProcessGroup snapshot,
final Collection<VersionedParameterContext> parameterContexts,
final Map<String, VersionedParameterContext> parameterContexts,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, final String comments,
final int expectedVersion) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
@ -4608,7 +4663,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
@ -4922,25 +4977,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return null;
}
@Override
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
}
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException | IOException e) {
logger.error(e.getMessage(), e);
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}
return snapshot;
}
@Override
public String getFlowRegistryName(final String flowRegistryId) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
@ -5397,6 +5433,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
outputPortDAO.verifyPublicPortUniqueness(portId, portName);
}
/**
* Create a new flow mapper using a mockable method for testing
*
* @param extensionManager the extension manager to create the flow mapper with
* @return a new NiFiRegistryFlowMapper instance
*/
protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final ExtensionManager extensionManager) {
return new NiFiRegistryFlowMapper(extensionManager);
}
/* setters */
public void setProperties(final NiFiProperties properties) {
this.properties = properties;

View File

@ -512,18 +512,20 @@ public abstract class ApplicationResource {
/**
* Authorizes the specified process group.
*
* @param processGroupAuthorizable process group
* @param authorizer authorizer
* @param lookup lookup
* @param action action
* @param authorizeReferencedServices whether to authorize referenced services
* @param authorizeTemplates whether to authorize templates
* @param authorizeControllerServices whether to authorize controller services
* @param processGroupAuthorizable process group
* @param authorizer authorizer
* @param lookup lookup
* @param action action
* @param authorizeReferencedServices whether to authorize referenced services
* @param authorizeTemplates whether to authorize templates
* @param authorizeControllerServices whether to authorize controller services
* @param authorizeTransitiveServices whether to authorize transitive services
* @param authorizeParameterReferences whether to authorize parameter references
*/
protected void authorizeProcessGroup(final ProcessGroupAuthorizable processGroupAuthorizable, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action,
final boolean authorizeReferencedServices, final boolean authorizeTemplates,
final boolean authorizeControllerServices, final boolean authorizeTransitiveServices,
final boolean authorizeParamterReferences) {
final boolean authorizeParameterReferences) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action, user);
@ -542,7 +544,7 @@ public abstract class ApplicationResource {
}
// authorize any referenced parameters if necessary
if (authorizeParamterReferences) {
if (authorizeParameterReferences) {
AuthorizeParameterReference.authorizeParameterReferences(processorAuthorizable, authorizer, processorAuthorizable.getParameterContext(), user);
}
});
@ -570,7 +572,7 @@ public abstract class ApplicationResource {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(controllerServiceAuthorizable, authorizer, lookup, authorizeTransitiveServices);
}
if (authorizeParamterReferences) {
if (authorizeParameterReferences) {
AuthorizeParameterReference.authorizeParameterReferences(controllerServiceAuthorizable, authorizer, controllerServiceAuthorizable.getParameterContext(), user);
}
});

View File

@ -54,6 +54,7 @@ import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@ -132,6 +133,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
@ -309,6 +311,49 @@ public class ProcessGroupResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
/**
* Retrieves the specified group as a versioned flow snapshot for download.
*
* @param groupId The id of the process group
* @return A processGroupEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/download")
@ApiOperation(
value = "Gets a process group for download",
response = String.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
}
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response exportProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
// ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true,
false, true, false, true);
});
// get the versioned flow
final VersionedFlowSnapshot currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupId(groupId);
// determine the name of the attachment - possible issues with spaces in file names
final VersionedProcessGroup currentVersionedProcessGroup = currentVersionedFlowSnapshot.getFlowContents();
final String flowName = currentVersionedProcessGroup.getName();
final String filename = flowName.replaceAll("\\s", "_") + ".json";
return generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build();
}
/**
* Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry

View File

@ -92,6 +92,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
@ -174,6 +175,66 @@ public class VersionsResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}/download")
@ApiOperation(
value = "Gets the latest version of a Process Group for download",
response = String.class,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}")
}
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response exportFlowVersion(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
// ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true,
false, true, false, true);
});
// get the versioned flow
final VersionedFlowSnapshot versionedFlowSnapshot = serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
final VersionedProcessGroup versionedProcessGroup = versionedFlowSnapshot.getFlowContents();
final String flowName = versionedProcessGroup.getName();
final int flowVersion = versionedFlowSnapshot.getSnapshotMetadata().getVersion();
// clear top-level registry data which doesn't belong in versioned flow download
versionedFlowSnapshot.setFlow(null);
versionedFlowSnapshot.setBucket(null);
versionedFlowSnapshot.setSnapshotMetadata(null);
// clear nested process group registry data which doesn't belong in versioned flow download
sanitizeRegistryInfo(versionedProcessGroup);
// determine the name of the attachment - possible issues with spaces in file names
final String filename = flowName.replaceAll("\\s", "_") + "_" + flowVersion + ".json";
return generateOkResponse(versionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build();
}
/**
* Recursively clear the registry info in the given versioned process group and all nested versioned process groups
*
* @param versionedProcessGroup the process group to sanitize
*/
private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGroup) {
versionedProcessGroup.setVersionedFlowCoordinates(null);
for (final VersionedProcessGroup innerVersionedProcessGroup : versionedProcessGroup.getProcessGroups()) {
sanitizeRegistryInfo(innerVersionedProcessGroup);
}
}
@POST
@Consumes(MediaType.APPLICATION_JSON)

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.web;
import com.google.common.collect.Maps;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
@ -33,14 +34,26 @@ import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Before;
import org.junit.Test;
@ -49,6 +62,8 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -59,6 +74,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -78,6 +94,8 @@ public class StandardNiFiServiceFacadeTest {
private StandardNiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private FlowController flowController;
private ProcessGroupDAO processGroupDAO;
@Before
public void setUp() throws Exception {
@ -155,13 +173,13 @@ public class StandardNiFiServiceFacadeTest {
});
// flow controller
final FlowController controller = mock(FlowController.class);
when(controller.getResource()).thenCallRealMethod();
when(controller.getParentAuthorizable()).thenCallRealMethod();
flowController = mock(FlowController.class);
when(flowController.getResource()).thenCallRealMethod();
when(flowController.getParentAuthorizable()).thenCallRealMethod();
// controller facade
final ControllerFacade controllerFacade = new ControllerFacade();
controllerFacade.setFlowController(controller);
controllerFacade.setFlowController(flowController);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
@ -277,4 +295,56 @@ public class StandardNiFiServiceFacadeTest {
});
}
@Test
public void testGetCurrentFlowSnapshotByGroupId() {
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
serviceFacade.setProcessGroupDAO(processGroupDAO);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
final FlowManager flowManager = mock(FlowManager.class);
final ExtensionManager extensionManager = mock(ExtensionManager.class);
when(flowController.getFlowManager()).thenReturn(flowManager);
when(flowController.getExtensionManager()).thenReturn(extensionManager);
final ControllerServiceProvider controllerServiceProvider = mock(ControllerServiceProvider.class);
when(flowController.getControllerServiceProvider()).thenReturn(controllerServiceProvider);
final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class);
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
// use spy to mock the make() method for generating a new flow mapper to make this testable
final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
final NiFiRegistryFlowMapper flowMapper = mock(NiFiRegistryFlowMapper.class);
when(serviceFacadeSpy.makeNiFiRegistryFlowMapper(extensionManager)).thenReturn(flowMapper);
final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mock(InstantiatedVersionedProcessGroup.class);
when(flowMapper.mapNonVersionedProcessGroup(processGroup, controllerServiceProvider)).thenReturn(nonVersionedProcessGroup);
final String parameterName = "foo";
final VersionedParameterContext versionedParameterContext = mock(VersionedParameterContext.class);
when(versionedParameterContext.getName()).thenReturn(parameterName);
final Map<String, VersionedParameterContext> parameterContexts = Maps.newHashMap();
parameterContexts.put(parameterName, versionedParameterContext);
when(flowMapper.mapParameterContexts(processGroup, true)).thenReturn(parameterContexts);
final ExternalControllerServiceReference externalControllerServiceReference = mock(ExternalControllerServiceReference.class);
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = Maps.newHashMap();
externalControllerServiceReferences.put("test", externalControllerServiceReference);
when(nonVersionedProcessGroup.getExternalControllerServiceReferences()).thenReturn(externalControllerServiceReferences);
final VersionedFlowSnapshot versionedFlowSnapshot = serviceFacadeSpy.getCurrentFlowSnapshotByGroupId(groupId);
assertEquals(nonVersionedProcessGroup, versionedFlowSnapshot.getFlowContents());
assertEquals(1, versionedFlowSnapshot.getParameterContexts().size());
assertEquals(versionedParameterContext, versionedFlowSnapshot.getParameterContexts().get(parameterName));
assertEquals(externalControllerServiceReferences, versionedFlowSnapshot.getExternalControllerServices());
assertEquals(RestBasedFlowRegistry.FLOW_ENCODING_VERSION, versionedFlowSnapshot.getFlowEncodingVersion());
assertNull(versionedFlowSnapshot.getFlow());
assertNull(versionedFlowSnapshot.getBucket());
assertNull(versionedFlowSnapshot.getSnapshotMetadata());
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestProcessGroupResource {
@Test
public void testExportProcessGroup() {
final String groupId = UUID.randomUUID().toString();
final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
final VersionedFlowSnapshot versionedFlowSnapshot = mock(VersionedFlowSnapshot.class);
when(serviceFacade.getCurrentFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
final String flowName = "flowname";
final VersionedProcessGroup versionedProcessGroup = mock(VersionedProcessGroup.class);
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);
when(versionedProcessGroup.getName()).thenReturn(flowName);
final ProcessGroupResource resource = getProcessGroupResource(serviceFacade);
final Response response = resource.exportProcessGroup(groupId);
final VersionedFlowSnapshot resultEntity = (VersionedFlowSnapshot)response.getEntity();
assertEquals(200, response.getStatus());
assertEquals(versionedFlowSnapshot, resultEntity);
}
private ProcessGroupResource getProcessGroupResource(final NiFiServiceFacade serviceFacade) {
final ProcessGroupResource resource = new ProcessGroupResource();
resource.setServiceFacade(serviceFacade);
return resource;
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import com.google.common.collect.Sets;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestVersionsResource {
@Test
public void testExportFlowVersion() {
final String groupId = UUID.randomUUID().toString();
final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
final VersionedFlowSnapshot versionedFlowSnapshot = mock(VersionedFlowSnapshot.class);
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
final String flowName = "flowname";
final int flowVersion = 1;
final VersionedProcessGroup versionedProcessGroup = mock(VersionedProcessGroup.class);
final VersionedFlowSnapshotMetadata snapshotMetadata = mock(VersionedFlowSnapshotMetadata.class);
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);
when(versionedProcessGroup.getName()).thenReturn(flowName);
when(versionedFlowSnapshot.getSnapshotMetadata()).thenReturn(snapshotMetadata);
when(snapshotMetadata.getVersion()).thenReturn(flowVersion);
final VersionedProcessGroup innerVersionedProcessGroup = mock(VersionedProcessGroup.class);
final VersionedProcessGroup innerInnerVersionedProcessGroup = mock(VersionedProcessGroup.class);
when(versionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerVersionedProcessGroup));
when(innerVersionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerInnerVersionedProcessGroup));
final VersionsResource resource = getVersionsResource(serviceFacade);
final Response response = resource.exportFlowVersion(groupId);
final VersionedFlowSnapshot resultEntity = (VersionedFlowSnapshot)response.getEntity();
assertEquals(200, response.getStatus());
assertEquals(versionedFlowSnapshot, resultEntity);
verify(versionedFlowSnapshot).setFlow(null);
verify(versionedFlowSnapshot).setBucket(null);
verify(versionedFlowSnapshot).setSnapshotMetadata(null);
verify(versionedProcessGroup).setVersionedFlowCoordinates(null);
verify(innerVersionedProcessGroup).setVersionedFlowCoordinates(null);
verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null);
}
private VersionsResource getVersionsResource(final NiFiServiceFacade serviceFacade) {
final VersionsResource resource = new VersionsResource();
resource.setServiceFacade(serviceFacade);
return resource;
}
}

View File

@ -36,6 +36,8 @@ public class OtpAuthenticationFilter extends NiFiAuthenticationFilter {
Pattern.compile("/flowfile-queues/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content");
private static final Pattern TEMPLATE_DOWNLOAD_PATTERN =
Pattern.compile("/templates/[a-f0-9\\-]{36}/download");
private static final Pattern FLOW_DOWNLOAD_PATTERN =
Pattern.compile("/process-groups/[a-f0-9\\-]{36}/download");
protected static final String ACCESS_TOKEN = "access_token";
@ -69,7 +71,8 @@ public class OtpAuthenticationFilter extends NiFiAuthenticationFilter {
}
private boolean isDownloadRequest(final String pathInfo) {
return PROVENANCE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() || QUEUE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() || TEMPLATE_DOWNLOAD_PATTERN.matcher(pathInfo).matches();
return PROVENANCE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() || QUEUE_DOWNLOAD_PATTERN.matcher(pathInfo).matches()
|| TEMPLATE_DOWNLOAD_PATTERN.matcher(pathInfo).matches() || FLOW_DOWNLOAD_PATTERN.matcher(pathInfo).matches();
}
}

View File

@ -150,7 +150,8 @@
urls: {
api: '../nifi-api',
controller: '../nifi-api/controller',
parameterContexts: '../nifi-api/parameter-contexts'
parameterContexts: '../nifi-api/parameter-contexts',
downloadToken: '../nifi-api/access/download-token'
}
};
@ -1377,6 +1378,45 @@
}
},
/**
* Downloads the current flow
*/
downloadFlow: function (selection) {
var processGroupId = null;
if (selection.empty()) {
processGroupId = nfCanvasUtils.getGroupId();
} else if (selection.size() === 1) {
var selectionData = selection.datum();
if (nfCanvasUtils.isProcessGroup(selection)) {
processGroupId = selectionData.id;
}
}
if (processGroupId !== null) {
nfCommon.getAccessToken(config.urls.downloadToken).done(function (downloadToken) {
var parameters = {};
// conditionally include the download token
if (!nfCommon.isBlank(downloadToken)) {
parameters['access_token'] = downloadToken;
}
// open the url
var uri = '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/download';
if (!$.isEmptyObject(parameters)) {
uri += ('?' + $.param(parameters));
}
window.open(uri);
}).fail(function () {
nfDialog.showOkDialog({
headerText: 'Download Flow',
dialogContent: 'Unable to generate access token for downloading content.'
});
});
}
},
/**
* Disconnects a Process Group from flow versioning.
*/

View File

@ -429,6 +429,18 @@
return nfCanvasUtils.isProcessGroup(selection);
};
/**
* Returns whether the process group supports downloading the current flow.
*
* @param selection
* @returns {boolean}
*/
var supportsDownloadFlow = function (selection) {
// download is allowed when either nothing is selected or a single readable process group is selected
return (selection.empty() && nfCanvasUtils.canReadCurrentGroup()) ||
(selection.size() === 1 && nfCanvasUtils.isProcessGroup(selection) && nfCanvasUtils.canRead(selection));
};
/**
* Determines whether the current selection supports flow versioning.
*
@ -494,36 +506,11 @@
* @returns {boolean}
*/
var supportsCommitFlowVersion = function (selection) {
// ensure this selection supports flow versioning above
if (supportsFlowVersioning(selection) === false) {
return false;
}
var versionControlInformation;
if (selection.empty()) {
// check bread crumbs for version control information in the current group
var breadcrumbEntities = nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity = breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
versionControlInformation = breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
return false;
}
} else {
return false;
}
} else {
var processGroupData = selection.datum();
versionControlInformation = processGroupData.component.versionControlInformation;
}
if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
return false;
}
var versionControlInformation = getFlowVersionControlInformation(selection);
// check the selection for version control information
return versionControlInformation.state === 'LOCALLY_MODIFIED';
return versionControlInformation !== null &&
versionControlInformation.state === 'LOCALLY_MODIFIED';
};
/**
@ -533,36 +520,11 @@
* @returns {boolean}
*/
var supportsForceCommitFlowVersion = function (selection) {
// ensure this selection supports flow versioning above
if (supportsFlowVersioning(selection) === false) {
return false;
}
var versionControlInformation;
if (selection.empty()) {
// check bread crumbs for version control information in the current group
var breadcrumbEntities = nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity = breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
versionControlInformation = breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
return false;
}
} else {
return false;
}
} else {
var processGroupData = selection.datum();
versionControlInformation = processGroupData.component.versionControlInformation;
}
if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
return false;
}
var versionControlInformation = getFlowVersionControlInformation(selection);
// check the selection for version control information
return versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
return versionControlInformation !== null &&
versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
};
@ -573,36 +535,12 @@
* @returns {boolean}
*/
var hasLocalChanges = function (selection) {
// ensure this selection supports flow versioning above
if (supportsFlowVersioning(selection) === false) {
return false;
}
var versionControlInformation;
if (selection.empty()) {
// check bread crumbs for version control information in the current group
var breadcrumbEntities = nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity = breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
versionControlInformation = breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
return false;
}
} else {
return false;
}
} else {
var processGroupData = selection.datum();
versionControlInformation = processGroupData.component.versionControlInformation;
}
if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
return false;
}
var versionControlInformation = getFlowVersionControlInformation(selection);
// check the selection for version control information
return versionControlInformation.state === 'LOCALLY_MODIFIED' || versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
return versionControlInformation !== null &&
(versionControlInformation.state === 'LOCALLY_MODIFIED' ||
versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE');
};
/**
@ -612,9 +550,36 @@
* @returns {boolean}
*/
var supportsChangeFlowVersion = function (selection) {
// ensure this selection supports flow versioning above
var versionControlInformation = getFlowVersionControlInformation(selection);
return versionControlInformation !== null &&
versionControlInformation.state !== 'LOCALLY_MODIFIED' &&
versionControlInformation.state !== 'LOCALLY_MODIFIED_AND_STALE' &&
versionControlInformation.state !== 'SYNC_FAILURE';
};
/**
* Determines whether the current selection supports stopping flow versioning.
*
* @param selection
* @returns {boolean}
*/
var supportsStopFlowVersioning = function (selection) {
var versionControlInformation = getFlowVersionControlInformation(selection);
return versionControlInformation !== null;
};
/**
* Convenience function to perform all flow versioning pre-checks and retrieve
* valid version information.
*
* @param selection
*/
var getFlowVersionControlInformation = function (selection) {
// ensure this selection supports flow versioning
if (supportsFlowVersioning(selection) === false) {
return false;
return null;
}
var versionControlInformation;
@ -626,56 +591,22 @@
if (breadcrumbEntity.permissions.canRead) {
versionControlInformation = breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
return false;
return null;
}
} else {
return false;
return null;
}
} else {
var processGroupData = selection.datum();
versionControlInformation = processGroupData.component.versionControlInformation;
}
if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
return false;
if (nfCommon.isDefinedAndNotNull(versionControlInformation)) {
return versionControlInformation;;
}
// check the selection for version control information
return versionControlInformation.state !== 'LOCALLY_MODIFIED' &&
versionControlInformation.state !== 'LOCALLY_MODIFIED_AND_STALE' &&
versionControlInformation.state !== 'SYNC_FAILURE';
};
/**
* Determines whether the current selection supports stopping flow versioning.
*
* @param selection
*/
var supportsStopFlowVersioning = function (selection) {
// ensure this selection supports flow versioning above
if (supportsFlowVersioning(selection) === false) {
return false;
}
if (selection.empty()) {
// check bread crumbs for version control information in the current group
var breadcrumbEntities = nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity = breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
return nfCommon.isDefinedAndNotNull(breadcrumbEntity.breadcrumb.versionControlInformation);
} else {
return false;
}
} else {
return false;
}
}
// check the selection for version control information
var processGroupData = selection.datum();
return nfCommon.isDefinedAndNotNull(processGroupData.component.versionControlInformation);
};
return null;
}
/**
* Determines whether the current selection could have provenance.
@ -910,6 +841,8 @@
{id: 'move-into-parent-menu-item', condition: canMoveToParent, menuItem: {clazz: 'fa fa-arrows', text: 'Move to parent group', action: 'moveIntoParent'}},
{id: 'group-menu-item', condition: canGroup, menuItem: {clazz: 'icon icon-group', text: 'Group', action: 'group'}},
{separator: true},
{id: 'download-menu-item', condition: supportsDownloadFlow, menuItem: {clazz: 'fa', text: 'Download flow', action: 'downloadFlow'}},
{separator: true},
{id: 'upload-template-menu-item', condition: canUploadTemplate, menuItem: {clazz: 'icon icon-template-import', text: 'Upload template', action: 'uploadTemplate'}},
{id: 'template-menu-item', condition: canCreateTemplate, menuItem: {clazz: 'icon icon-template-save', text: 'Create template', action: 'template'}},
{separator: true},