NIFI-4436: Added additional endpoints; bug fixes

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-10-30 16:35:59 -04:00 committed by Bryan Bende
parent 7a0a900a0f
commit 6aa8b5c61c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
29 changed files with 1181 additions and 369 deletions

View File

@ -16,11 +16,13 @@
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import javax.xml.bind.annotation.XmlRootElement;
import io.swagger.annotations.ApiModelProperty;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO.
@ -30,6 +32,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
private ProcessGroupDTO component;
private ProcessGroupStatusDTO status;
private VersionedFlowSnapshot versionedFlowSnapshot;
private Integer runningCount;
private Integer stoppedCount;
@ -46,10 +49,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
*
* @return The ProcessGroupDTO object
*/
@Override
public ProcessGroupDTO getComponent() {
return component;
}
@Override
public void setComponent(ProcessGroupDTO component) {
this.component = component;
}
@ -180,4 +185,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
this.inactiveRemotePortCount = inactiveRemotePortCount;
}
@ApiModelProperty(value = "Returns the Versioned Flow that describes the contents of the Versioned Flow to be imported", readOnly = true)
public VersionedFlowSnapshot getVersionedFlowSnapshot() {
return versionedFlowSnapshot;
}
public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) {
this.versionedFlowSnapshot = versionedFlowSnapshot;
}
}

View File

@ -783,8 +783,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
* and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
* throw an IllegalStateException
* @param updateSettings whether or not to update the process group's name and positions
*/
void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
/**
* Verifies a template with the specified name can be created.

View File

@ -24,17 +24,47 @@ import java.io.IOException;
import java.util.Set;
public interface FlowRegistry {
/**
* @return the ID of the Flow Registry
*/
String getIdentifier();
/**
* @return the description of the Flow Registry
*/
String getDescription();
/**
* Updates the Flow Registry's description
*
* @param description the description of the Flow Registry
*/
void setDescription(String description);
/**
* @return the URL of the Flow Registry
*/
String getURL();
/**
* Updates the Flow Registry's URL
*
* @param url the URL of the Flow Registry
*/
void setURL(String url);
/**
* @return the name of the Flow Registry
*/
String getName();
/**
* Updates the name of the Flow Registry
*
* @param name the name of the Flow Registry
*/
void setName(String name);
/**
* Gets the buckets for the specified user.
*

View File

@ -34,4 +34,10 @@ public interface FlowRegistryClient {
}
Set<String> getRegistryIdentifiers();
void addFlowRegistry(FlowRegistry registry);
FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description);
FlowRegistry removeFlowRegistry(String registryId);
}

View File

@ -165,6 +165,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@ -2128,6 +2130,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
if (!supportedBundles.contains(requiredCoordinate)) {
throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
}
}
private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (templateContents.getProcessors() != null) {
templateContents.getProcessors().forEach(processor -> {
@ -2150,6 +2159,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (versionedFlow.getProcessors() != null) {
versionedFlow.getProcessors().forEach(processor -> {
if (processor.getBundle() == null) {
throw new IllegalArgumentException("Processor bundle must be specified.");
}
if (supportedTypes.containsKey(processor.getType())) {
verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
} else {
throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
}
});
}
if (versionedFlow.getProcessGroups() != null) {
versionedFlow.getProcessGroups().forEach(processGroup -> {
verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
});
}
}
private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (templateContents.getControllerServices() != null) {
templateContents.getControllerServices().forEach(controllerService -> {
@ -2172,6 +2203,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (versionedFlow.getControllerServices() != null) {
versionedFlow.getControllerServices().forEach(controllerService -> {
if (supportedTypes.containsKey(controllerService.getType())) {
if (controllerService.getBundle() == null) {
throw new IllegalArgumentException("Controller Service bundle must be specified.");
}
verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
} else {
throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
}
});
}
if (versionedFlow.getProcessGroups() != null) {
versionedFlow.getProcessGroups().forEach(processGroup -> {
verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
});
}
}
public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
@ -2210,6 +2263,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
final String name = c.getName();
processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
final String name = c.getName();
controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
final Set<String> prioritizerClasses = new HashSet<>();
for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
prioritizerClasses.add(c.getName());
}
final Set<VersionedConnection> allConns = new HashSet<>();
allConns.addAll(versionedFlow.getConnections());
for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
allConns.addAll(findAllConnections(childGroup));
}
for (final VersionedConnection conn : allConns) {
final List<String> prioritizers = conn.getPrioritizers();
if (prioritizers != null) {
for (final String prioritizer : prioritizers) {
if (!prioritizerClasses.contains(prioritizer)) {
throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
}
}
}
}
}
/**
* <p>
* Verifies that the given DTO is valid, according to the following:
@ -2270,6 +2361,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return conns;
}
private Set<VersionedConnection> findAllConnections(final VersionedProcessGroup group) {
final Set<VersionedConnection> conns = new HashSet<>();
for (final VersionedConnection connection : group.getConnections()) {
conns.add(connection);
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
conns.addAll(findAllConnections(childGroup));
}
return conns;
}
//
// Processor access
//

View File

@ -85,6 +85,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
@ -184,7 +185,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
try {
if (flowAlreadySynchronized) {
existingFlow = toBytes(controller);
existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty();
existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty()
&& controller.getAllReportingTasks().isEmpty()
&& controller.getAllControllerServices().isEmpty()
&& controller.getFlowRegistryClient().getRegistryIdentifiers().isEmpty();
} else {
existingFlow = readFlowFromDisk();
if (existingFlow == null || existingFlow.length == 0) {
@ -220,10 +224,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
}
final boolean registriesPresent;
final Element registriesElement = DomUtils.getChild(rootElement, "registries");
if (registriesElement == null) {
registriesPresent = false;
} else {
final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
registriesPresent = !flowRegistryElems.isEmpty();
}
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
existingFlowEmpty = taskElements.isEmpty()
&& unrootedControllerServiceElements.isEmpty()
&& isEmpty(rootGroupDto)
&& registriesPresent;
logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
}
}
@ -318,6 +334,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the root group XML element
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
if (!flowAlreadySynchronized || existingFlowEmpty) {
final Element registriesElement = DomUtils.getChild(rootElement, "registries");
if (registriesElement != null) {
final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
for (final Element flowRegistryElement : flowRegistryElems) {
final String registryId = getString(flowRegistryElement, "id");
final String registryName = getString(flowRegistryElement, "name");
final String registryUrl = getString(flowRegistryElement, "url");
final String description = getString(flowRegistryElement, "description");
final FlowRegistryClient client = controller.getFlowRegistryClient();
client.addFlowRegistry(registryId, registryName, registryUrl, description);
}
}
}
// if this controller isn't initialized or its empty, add the root group, otherwise update
final ProcessGroup rootGroup;
if (!flowAlreadySynchronized || existingFlowEmpty) {

View File

@ -39,6 +39,8 @@ import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
@ -74,7 +76,7 @@ import java.util.concurrent.TimeUnit;
*/
public class StandardFlowSerializer implements FlowSerializer {
private static final String MAX_ENCODING_VERSION = "1.2";
private static final String MAX_ENCODING_VERSION = "1.3";
private final StringEncryptor encryptor;
@ -98,6 +100,11 @@ public class StandardFlowSerializer implements FlowSerializer {
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
final Element registriesElement = doc.createElement("registries");
rootNode.appendChild(registriesElement);
addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
// Add root-level controller services
@ -130,6 +137,26 @@ public class StandardFlowSerializer implements FlowSerializer {
}
}
private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
for (final String registryId : registryClient.getRegistryIdentifiers()) {
final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);
final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
parentElement.appendChild(registryElement);
addStringElement(registryElement, "id", flowRegistry.getIdentifier());
addStringElement(registryElement, "name", flowRegistry.getName());
addStringElement(registryElement, "url", flowRegistry.getURL());
addStringElement(registryElement, "description", flowRegistry.getDescription());
}
}
private void addStringElement(final Element parentElement, final String elementName, final String value) {
final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
childElement.setTextContent(value);
parentElement.appendChild(childElement);
}
private void addSize(final Element parentElement, final Size size) {
final Element element = parentElement.getOwnerDocument().createElement("size");
element.setAttribute("width", String.valueOf(size.getWidth()));

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
@ -198,6 +199,21 @@ public class FingerprintFactory {
}
private StringBuilder addFlowControllerFingerprint(final StringBuilder builder, final Element flowControllerElem, final FlowController controller) {
// registries
final Element registriesElement = DomUtils.getChild(flowControllerElem, "registries");
if (registriesElement == null) {
builder.append("NO_VALUE");
} else {
final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
if (flowRegistryElems.isEmpty()) {
builder.append("NO_VALUE");
} else {
for (final Element flowRegistryElement : flowRegistryElems) {
addFlowRegistryFingerprint(builder, flowRegistryElement);
}
}
}
// root group
final Element rootGroupElem = (Element) DomUtils.getChildNodesByTagName(flowControllerElem, "rootGroup").item(0);
addProcessGroupFingerprint(builder, rootGroupElem, controller);
@ -265,6 +281,11 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addFlowRegistryFingerprint(final StringBuilder builder, final Element flowRegistryElement) {
Stream.of("id", "name", "url", "description").forEach(elementName -> appendFirstValue(builder, DomUtils.getChildNodesByTagName(flowRegistryElement, elementName)));
return builder;
}
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));

View File

@ -72,7 +72,7 @@ import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.UnknownResourceException;
import org.apache.nifi.registry.flow.VersionControlInformation;
@ -2835,11 +2835,14 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public void disconnectVersionControl() {
writeLock.lock();
try {
// TODO remove version component ids from each component (until another versioned PG is encountered)
this.versionControlInfo.set(null);
// remove version component ids from each component (until another versioned PG is encountered)
applyVersionedComponentIds(this, id -> null);
} finally {
writeLock.unlock();
}
@ -2850,36 +2853,41 @@ public final class StandardProcessGroup implements ProcessGroup {
return;
}
processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
applyVersionedComponentIds(processGroup, versionedComponentIds::get);
}
private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
processGroup.getConnections().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getProcessors().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getInputPorts().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getOutputPorts().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getLabels().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getFunnels().stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getControllerServices(false).stream()
.forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getRemoteProcessGroups().stream()
.forEach(rpg -> {
rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
rpg.getInputPorts().stream()
.forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
.forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
rpg.getOutputPorts().stream()
.forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
.forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
processGroup.getProcessGroups().stream()
.forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds));
.filter(childGroup -> childGroup.getVersionControlInformation() != null)
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
@ -2931,10 +2939,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) {
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
@ -2950,15 +2958,15 @@ public final class StandardProcessGroup implements ProcessGroup {
.map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
.collect(Collectors.toSet());
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
} else {
// TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient
// than having to remember to enable DEBUG level logging every time a full build is done.
LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
if (LOG.isInfoEnabled()) {
final String differencesByLine = flowComparison.getDifferences().stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
}
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false);
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} finally {
@ -2968,10 +2976,14 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException {
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
group.setComments(proposed.getComments());
group.setName(proposed.getName());
if (updateName) {
group.setName(proposed.getName());
}
if (updatePosition && proposed.getPosition() != null) {
group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
}
@ -2998,7 +3010,7 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVariables(updatedVariableMap);
final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates();
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
final String bucketId = remoteCoordinates.getBucketId();
@ -3022,7 +3034,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed);
LOG.info("Added {} to {}", added, this);
} else {
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true);
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
LOG.info("Updated {}", childGroup);
}
@ -3136,14 +3148,29 @@ public final class StandardProcessGroup implements ProcessGroup {
final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed);
final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
.map(relName -> added.getRelationship(relName))
.collect(Collectors.toSet());
autoTerminatedRelationships.put(added, proposedAutoTerminated);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor);
final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
.map(relName -> processor.getRelationship(relName))
.collect(Collectors.toSet());
if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
autoTerminatedRelationships.put(processor, proposedAutoTerminated);
}
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@ -3205,6 +3232,13 @@ public final class StandardProcessGroup implements ProcessGroup {
group.removeConnection(connection);
}
// Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
// We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
// then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
// Connection for that relationship exists. This will throw an Exception.
autoTerminatedRelationships.forEach((proc, rels) -> proc.setAutoTerminatedRelationships(rels));
// Remove all controller services no longer in use
for (final String removedVersionedId : controllerServicesRemoved) {
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", service, group);
@ -3276,7 +3310,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier());
addProcessGroup(group);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
return group;
}
@ -3535,10 +3569,6 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setYieldPeriod(proposed.getYieldDuration());
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
.map(relName -> processor.getRelationship(relName))
.collect(Collectors.toSet()));
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
@ -3547,6 +3577,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
final Map<String, String> fullPropertyMap = new HashMap<>();
for (final PropertyDescriptor property : currentProperties.keySet()) {
@ -3646,7 +3677,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public String getName() {
return "Flow Under Version Control";
return "Versioned Flow";
}
};
@ -3659,7 +3690,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.findAny()
.isPresent();
LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences);
LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences);
return Optional.of(modified);
}
@ -3669,27 +3700,24 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.lock();
try {
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo == null) {
throw new IllegalStateException("Cannot update the Version of the flow for " + this
+ " because the Process Group is not currently under Version Control");
}
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}
if (verifyNotDirty) {
final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
if (!modifiedOption.isPresent()) {
throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+ "has not yet been synchronized with the Flow Registry. The Process Group must be"
+ " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
if (versionControlInfo != null) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}
if (Boolean.TRUE.equals(modifiedOption.get())) {
throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+ " restored to its original form before changing the version");
if (verifyNotDirty) {
final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
if (!modifiedOption.isPresent()) {
throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+ "has not yet been synchronized with the Flow Registry. The Process Group must be"
+ " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
}
if (Boolean.TRUE.equals(modifiedOption.get())) {
throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+ " restored to its original form before changing the version");
}
}
}

View File

@ -17,22 +17,13 @@
package org.apache.nifi.registry.flow;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.net.URI;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -44,22 +35,44 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* A simple file-based implementation of a Flow Registry Client. Rather than interacting
* with an actual Flow Registry, this implementation simply reads flows from disk and writes
* them to disk. It is not meant for any production use but is available for testing purposes.
*/
public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry {
public class FileBasedFlowRegistry implements FlowRegistry {
private final File directory;
private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
private final JsonFactory jsonFactory = new JsonFactory();
private final String id;
private volatile String name = "Local Registry";
private volatile String url = "file:" + (new File("..").getAbsolutePath());
private volatile String description = "Default file-based Flow Registry";
public FileBasedFlowRegistry(final String id, final String url) throws IOException {
final URI uri = URI.create(url);
if (!uri.getScheme().equalsIgnoreCase("file")) {
throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
}
this.directory = new File(URI.create(url).getPath());
public FileBasedFlowRegistryClient(final File directory) throws IOException {
if (!directory.exists() && !directory.mkdirs()) {
throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
}
this.directory = directory;
this.id = id;
this.url = url;
recoverBuckets();
}
@ -101,23 +114,14 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
}
}
@Override
public FlowRegistry getFlowRegistry(final String registryId) {
if (!"default".equals(registryId)) {
return null;
}
return this;
}
@Override
public String getURL() {
return directory.toURI().toString();
return url;
}
@Override
public String getName() {
return "Local Registry";
return name;
}
@Override
@ -138,12 +142,29 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
bucket.setName("Bucket '" + bucketIdentifier + "'");
bucket.setCreatedTimestamp(creation);
final Set<VersionedFlow> versionedFlows = new HashSet<>();
final File[] flowDirs = bucketDirectory.listFiles();
if (flowDirs != null) {
for (final File flowDir : flowDirs) {
final String flowIdentifier = flowDir.getName();
try {
final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
versionedFlows.add(versionedFlow);
} catch (UnknownResourceException e) {
continue;
}
}
}
bucket.setVersionedFlows(versionedFlows);
buckets.add(bucket);
}
return buckets;
}
@Override
public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
Objects.requireNonNull(flow);
@ -241,7 +262,7 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
final File contentsFile = new File(snapshotDir, "flow.xml");
try (final OutputStream out = new FileOutputStream(contentsFile);
final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) {
final JsonGenerator generator = jsonFactory.createGenerator(out)) {
generator.setCodec(new ObjectMapper());
generator.setPrettyPrinter(new DefaultPrettyPrinter());
generator.writeObject(snapshot);
@ -269,11 +290,6 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
return response;
}
@Override
public Set<String> getRegistryIdentifiers() {
return Collections.singleton("default");
}
@Override
public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
// Verify that the bucket exists
@ -400,6 +416,8 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
flow.setSnapshotMetadata(snapshotMetadataSet);
final File[] versionDirs = flowDir.listFiles();
flow.setVersionCount(versionDirs.length);
for (final File file : versionDirs) {
if (!file.isDirectory()) {
continue;
@ -432,4 +450,29 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
return flow;
}
@Override
public String getIdentifier() {
return id;
}
@Override
public String getDescription() {
return description;
}
@Override
public void setDescription(String description) {
this.description = description;
}
@Override
public void setURL(String url) {
this.url = url;
}
@Override
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
import java.io.IOException;
import java.net.URI;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class StandardFlowRegistryClient implements FlowRegistryClient {
private ConcurrentMap<String, FlowRegistry> registryById = new ConcurrentHashMap<>();
@Override
public FlowRegistry getFlowRegistry(String registryId) {
return registryById.get(registryId);
}
@Override
public Set<String> getRegistryIdentifiers() {
return registryById.keySet();
}
@Override
public void addFlowRegistry(final FlowRegistry registry) {
final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry);
if (existing != null) {
throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier());
}
}
@Override
public FlowRegistry addFlowRegistry(final String registryId, final String registryName, final String registryUrl, final String description) {
final URI uri = URI.create(registryUrl);
final String uriScheme = uri.getScheme();
final FlowRegistry registry;
if (uriScheme.equalsIgnoreCase("file")) {
try {
registry = new FileBasedFlowRegistry(registryId, registryUrl);
} catch (IOException e) {
throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e);
}
registry.setName(registryName);
registry.setDescription(description);
} else {
throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl
+ " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme);
}
addFlowRegistry(registry);
return registry;
}
@Override
public FlowRegistry removeFlowRegistry(final String registryId) {
return registryById.remove(registryId);
}
}

View File

@ -28,10 +28,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@ -46,6 +48,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
@ -56,13 +59,14 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.Position;
import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
@ -78,9 +82,91 @@ public class NiFiRegistryFlowMapper {
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) {
versionedComponentIds.clear();
return mapGroup(group, registryClient, true);
final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true);
// TODO: Test that this works properly
populateReferencedAncestorServices(group, mapped);
// TODO: Test that this works properly
populateReferencedAncestorVariables(group, mapped);
return mapped;
}
private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
final Set<ControllerServiceNode> ancestorControllerServices = group.getControllerServices(true);
ancestorControllerServices.remove(group.getControllerServices(false));
final Map<String, ControllerServiceNode> ancestorServicesById = ancestorControllerServices.stream()
.collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity()));
final Set<ControllerServiceNode> referenced = new HashSet<>();
for (final ProcessorNode processor : group.findAllProcessors()) {
findReferencedServices(processor, ancestorServicesById, referenced);
}
for (final ControllerServiceNode service : group.findAllControllerServices()) {
findReferencedServices(service, ancestorServicesById, referenced);
}
final Set<VersionedControllerService> versionedServices = referenced.stream().map(this::mapControllerService)
.collect(Collectors.toCollection(LinkedHashSet::new));
versionedGroup.getControllerServices().addAll(versionedServices);
}
private Set<ControllerServiceNode> findReferencedServices(final ConfiguredComponent component, final Map<String, ControllerServiceNode> ancestorServicesById,
final Set<ControllerServiceNode> referenced) {
for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null) {
final String serviceId = entry.getValue();
final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId);
if (serviceNode != null) {
referenced.add(serviceNode);
referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced));
}
}
}
return referenced;
}
private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
final Set<String> ancestorVariableNames = new HashSet<>();
populateVariableNames(group.getParent(), ancestorVariableNames);
final Map<String, String> implicitlyDefinedVariables = new HashMap<>();
for (final String variableName : ancestorVariableNames) {
final boolean isReferenced = !group.getComponentsAffectedByVariable(variableName).isEmpty();
if (isReferenced) {
final String value = group.getVariableRegistry().getVariableValue(variableName);
implicitlyDefinedVariables.put(variableName, value);
}
}
if (!implicitlyDefinedVariables.isEmpty()) {
// Merge the implicit variables with the explicitly defined variables for the Process Group
// and set those as the Versioned Group's variables.
implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
versionedGroup.setVariables(implicitlyDefinedVariables);
}
}
private void populateVariableNames(final ProcessGroup group, final Set<String> variableNames) {
if (group == null) {
return;
}
group.getVariableRegistry().getVariableMap().keySet().stream()
.map(VariableDescriptor::getName)
.forEach(variableNames::add);
populateVariableNames(group.getParent(), variableNames);
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
@ -95,7 +181,7 @@ public class NiFiRegistryFlowMapper {
if (!topLevel) {
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {
final RemoteFlowCoordinates coordinates = new RemoteFlowCoordinates();
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
final String registryId = versionControlInfo.getRegistryIdentifier();
final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
if (registry == null) {
@ -237,6 +323,7 @@ public class NiFiRegistryFlowMapper {
private Map<String, String> mapProperties(final ConfiguredComponent component) {
final Map<String, String> mapped = new HashMap<>();
component.getProperties().keySet().stream()
.filter(property -> !property.isSensitive())
.forEach(property -> {
String value = component.getProperty(property);
if (value == null) {
@ -312,7 +399,7 @@ public class NiFiRegistryFlowMapper {
versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks());
versionedPort.setName(port.getName());
versionedPort.setPosition(mapPosition(port.getPosition()));
versionedPort.setType(PortType.valueOf(port.getComponentType()));
versionedPort.setType(PortType.valueOf(port.getConnectableType().name()));
return versionedPort;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.util;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.api.dto.BundleDTO;
import java.util.List;
@ -28,7 +29,6 @@ import java.util.stream.Collectors;
* Utility class for Bundles.
*/
public final class BundleUtils {
private static BundleCoordinate findBundleForType(final String type, final BundleCoordinate desiredCoordinate) {
final List<Bundle> bundles = ExtensionManager.getBundles(type);
if (bundles.isEmpty()) {
@ -140,4 +140,50 @@ public final class BundleUtils {
}
}
/**
* Discovers the compatible bundle details for the components in the specified Versioned Process Group and updates the Versioned Process Group
* to reflect the appropriate bundles.
*
* @param versionedGroup the versioned group
*/
public static void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) {
if (versionedGroup.getProcessors() != null) {
versionedGroup.getProcessors().forEach(processor -> {
final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle()));
final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle();
bundle.setArtifact(coordinate.getId());
bundle.setGroup(coordinate.getGroup());
bundle.setVersion(coordinate.getVersion());
processor.setBundle(bundle);
});
}
if (versionedGroup.getControllerServices() != null) {
versionedGroup.getControllerServices().forEach(controllerService -> {
final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle()));
final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle();
bundle.setArtifact(coordinate.getId());
bundle.setGroup(coordinate.getGroup());
bundle.setVersion(coordinate.getVersion());
controllerService.setBundle(bundle);
});
}
if (versionedGroup.getProcessGroups() != null) {
versionedGroup.getProcessGroups().forEach(processGroup -> {
discoverCompatibleBundles(processGroup);
});
}
}
public static BundleDTO createBundleDto(final org.apache.nifi.registry.flow.Bundle bundle) {
final BundleDTO dto = new BundleDTO();
dto.setArtifact(bundle.getArtifact());
dto.setGroup(dto.getGroup());
dto.setVersion(dto.getVersion());
return dto;
}
}

View File

@ -26,6 +26,8 @@
</xs:sequence>
</xs:choice>
<xs:element name="registries" type="RegistriesType" minOccurs="0" maxOccurs="1" />
<!-- Groupings of Processors/Ports -->
<xs:element name="rootGroup" type="RootProcessGroupType" />
@ -38,6 +40,21 @@
<xs:attribute name="encoding-version" type="xs:string"/>
</xs:complexType>
<xs:complexType name="RegistriesType">
<xs:sequence>
<xs:element name="flowRegistry" type="FlowRegistryType" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
<xs:complexType name="FlowRegistryType">
<xs:sequence>
<xs:element name="id" type="NonEmptyStringType" />
<xs:element name="name" type="NonEmptyStringType" />
<xs:element name="url" type="NonEmptyStringType" />
<xs:element name="description" type="NonEmptyStringType" />
</xs:sequence>
</xs:complexType>
<!-- the processor "id" is a key that should be valid within each flowController-->
<xs:complexType name="ProcessorType">
<xs:sequence>

View File

@ -36,9 +36,7 @@
</bean>
<!-- flow registry -->
<bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.FileBasedFlowRegistryClient">
<constructor-arg index="0" type="java.io.File" value="../flowRegistry" />
</bean>
<bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.StandardFlowRegistryClient" />
<!-- flow controller -->
<bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean">

View File

@ -654,7 +654,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) {
public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) {
}
@Override

View File

@ -32,6 +32,8 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position;
@ -208,9 +210,11 @@ public class FingerprintFactoryTest {
when(component.getProxyPort()).thenReturn(null);
when(component.getProxyUser()).thenReturn(null);
when(component.getProxyPassword()).thenReturn(null);
when(component.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "id" +
"NO_VALUE" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"eth0" +
"10 sec" +
@ -245,9 +249,11 @@ public class FingerprintFactoryTest {
when(component.getProxyPort()).thenReturn(3128);
when(component.getProxyUser()).thenReturn("proxy-user");
when(component.getProxyPassword()).thenReturn("proxy-pass");
when(component.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "id" +
"NO_VALUE" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"NO_VALUE" +
"10 sec" +
@ -273,6 +279,7 @@ public class FingerprintFactoryTest {
when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3));
when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi");
when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(groupComponent.getVersionedComponentId()).thenReturn(Optional.empty());
final RemoteGroupPort portComponent = mock(RemoteGroupPort.class);
when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent));
@ -288,6 +295,7 @@ public class FingerprintFactoryTest {
when(portComponent.getBatchDuration()).thenReturn("10sec");
// Serializer doesn't serialize if a port doesn't have any connection.
when(portComponent.hasIncomingConnection()).thenReturn(true);
when(portComponent.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "portId" +

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.api.dto.BundleDTO;
public class FlowRegistryUtils {
public static boolean containsRestrictedComponent(final VersionedProcessGroup group) {
final Set<Tuple<String, BundleCoordinate>> componentTypes = new HashSet<>();
populateComponentTypes(group, componentTypes);
for (final Tuple<String, BundleCoordinate> tuple : componentTypes) {
final ConfigurableComponent component = ExtensionManager.getTempComponent(tuple.getKey(), tuple.getValue());
if (component == null) {
throw new NiFiCoreException("Could not create an instance of component " + tuple.getKey() + " using bundle coordinates " + tuple.getValue());
}
final boolean isRestricted = component.getClass().isAnnotationPresent(Restricted.class);
if (isRestricted) {
return true;
}
}
return false;
}
private static void populateComponentTypes(final VersionedProcessGroup group, final Set<Tuple<String, BundleCoordinate>> componentTypes) {
group.getProcessors().stream()
.map(versionedProc -> new Tuple<>(versionedProc.getType(), createBundleCoordinate(versionedProc.getBundle())))
.forEach(componentTypes::add);
group.getControllerServices().stream()
.map(versionedSvc -> new Tuple<>(versionedSvc.getType(), createBundleCoordinate(versionedSvc.getBundle())))
.forEach(componentTypes::add);
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
populateComponentTypes(childGroup, componentTypes);
}
}
public static BundleCoordinate createBundleCoordinate(final Bundle bundle) {
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
public static BundleDTO createBundleDto(final Bundle bundle) {
return new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
}

View File

@ -406,6 +406,13 @@ public interface NiFiServiceFacade {
*/
void verifyComponentTypes(FlowSnippetDTO snippet);
/**
* Verifies the types of components in a versioned process group
*
* @param versionedGroup the proposed process group
*/
void verifyComponentTypes(VersionedProcessGroup versionedGroup);
/**
* Creates a new Template based off the specified snippet.
*
@ -1385,10 +1392,11 @@ public interface NiFiServiceFacade {
* @param versionControlInfo the Version Control information
* @param snapshot the new snapshot
* @param componentIdSeed the seed to use for generating new component ID's
* @param updateSettings whether or not the process group's name and position should be updated
* @return the Process Group
*/
ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
boolean verifyNotModified);
ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
boolean verifyNotModified, boolean updateSettings);
// ----------------------------------------
// Component state methods

View File

@ -16,7 +16,32 @@
*/
package org.apache.nifi.web;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@ -58,6 +83,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@ -85,10 +111,9 @@ import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.UnknownResourceException;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
@ -103,20 +128,19 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@ -239,6 +263,7 @@ import org.apache.nifi.web.dao.LabelDAO;
import org.apache.nifi.web.dao.PortDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.ProcessorDAO;
import org.apache.nifi.web.dao.RegistryDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.apache.nifi.web.dao.SnippetDAO;
@ -257,30 +282,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@ -312,6 +314,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private UserDAO userDAO;
private UserGroupDAO userGroupDAO;
private AccessPolicyDAO accessPolicyDAO;
private RegistryDAO registryDAO;
private ClusterCoordinator clusterCoordinator;
private HeartbeatMonitor heartbeatMonitor;
private LeaderElectionManager leaderElectionManager;
@ -331,8 +334,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private AuthorizableLookup authorizableLookup;
private Map<String, Tuple<Revision, RegistryDTO>> registryCache = new HashMap<>();
// -----------------------------------------
// Synchronization methods
// -----------------------------------------
@ -1848,6 +1849,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
templateDAO.verifyComponentTypes(snippet);
}
@Override
public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
controllerFacade.verifyComponentTypes(versionedGroup);
}
@Override
public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
// get the specified snippet
@ -2260,44 +2266,101 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createControllerServiceEntity(snapshot, null, permissions, null);
}
private RegistryEntity createRegistryEntity(final Revision updatedRevision, final RegistryDTO registryDTO) {
@Override
public RegistryEntity createRegistry(Revision revision, RegistryDTO registryDTO) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// read lock on the containing group
// request claim for component to be created... revision already verified (version == 0)
final RevisionClaim claim = new StandardRevisionClaim(revision);
// update revision through revision manager
final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
// add the component
final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);
// save the flow
controllerFacade.save();
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
return new StandardRevisionUpdate<>(registry, lastMod);
});
final FlowRegistry registry = revisionUpdate.getComponent();
return createRegistryEntity(registry);
}
@Override
public RegistryEntity getRegistry(final String registryId) {
final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
return createRegistryEntity(registry);
}
private RegistryEntity createRegistryEntity(final FlowRegistry flowRegistry) {
if (flowRegistry == null) {
return null;
}
final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);
final Revision revision = revisionManager.getRevision(dto.getId());
final RegistryEntity entity = new RegistryEntity();
entity.setId(registryDTO.getId());
entity.setPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
entity.setRevision(dtoFactory.createRevisionDTO(updatedRevision));
entity.setComponent(registryDTO);
entity.setComponent(dto);
entity.setRevision(dtoFactory.createRevisionDTO(revision));
entity.setId(dto.getId());
// User who created it can read/write it.
final PermissionsDTO permissions = new PermissionsDTO();
permissions.setCanRead(true);
permissions.setCanWrite(true);
entity.setPermissions(permissions);
return entity;
}
@Override
public RegistryEntity createRegistry(Revision revision, RegistryDTO registryDTO) {
registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO));
return createRegistryEntity(revision, registryDTO);
}
@Override
public RegistryEntity getRegistry(String registryId) {
final Tuple<Revision, RegistryDTO> registry = registryCache.get(registryId);
return createRegistry(registry.getKey(), registry.getValue());
}
@Override
public Set<RegistryEntity> getRegistries() {
return registryCache.values().stream()
.map(registry -> createRegistry(registry.getKey(), registry.getValue()))
.collect(Collectors.toSet());
return registryDAO.getFlowRegistries().stream()
.map(this::createRegistryEntity)
.collect(Collectors.toSet());
}
@Override
public RegistryEntity updateRegistry(Revision revision, RegistryDTO registryDTO) {
registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO));
return createRegistryEntity(revision, registryDTO);
final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
registry.setDescription(registryDTO.getDescription());
registry.setName(registryDTO.getName());
registry.setURL(registryDTO.getUri());
controllerFacade.save();
final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
return new StandardRevisionUpdate<FlowRegistry>(registry, lastModification);
});
final FlowRegistry updatedReg = revisionUpdate.getComponent();
return createRegistryEntity(updatedReg);
}
@Override
public RegistryEntity deleteRegistry(Revision revision, String registryId) {
final Tuple<Revision, RegistryDTO> registry = registryCache.remove(registryId);
return createRegistryEntity(registry.getKey(), registry.getValue());
public RegistryEntity deleteRegistry(final Revision revision, final String registryId) {
final RevisionClaim claim = new StandardRevisionClaim(revision);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
controllerFacade.save();
return reg;
});
return createRegistryEntity(registry);
}
@Override
@ -3665,6 +3728,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
})
.collect(Collectors.toCollection(HashSet::new));
final Map<String, List<Connection>> connectionsByVersionedId = group.findAllConnections().stream()
.filter(conn -> conn.getVersionedComponentId().isPresent())
.collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get()));
for (final FlowDifference difference : comparison.getDifferences()) {
VersionedComponent component = difference.getComponentA();
if (component == null) {
@ -3674,31 +3741,40 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
final VersionedConnection connection = (VersionedConnection) component;
final ConnectableComponent source = connection.getSource();
final ConnectableComponent destination = connection.getDestination();
final String versionedConnectionId = connection.getIdentifier();
final List<Connection> instances = connectionsByVersionedId.get(versionedConnectionId);
if (instances == null) {
continue;
}
affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user));
affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user));
for (final Connection instance : instances) {
affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user));
affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user));
}
}
}
return affectedComponents;
}
private String getComponentState(final InstantiatedConnectableComponent localComponent) {
final String componentId = localComponent.getInstanceId();
final String groupId = localComponent.getInstanceGroupId();
switch (localComponent.getType()) {
case PROCESSOR:
return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name();
case REMOTE_INPUT_PORT:
return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name();
case REMOTE_OUTPUT_PORT:
return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name();
default:
return null;
}
private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) {
final AffectedComponentEntity entity = new AffectedComponentEntity();
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
entity.setId(connectable.getIdentifier());
final Authorizable authorizable = getAuthorizable(connectable);
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
entity.setPermissions(permissionsDto);
final AffectedComponentDTO dto = new AffectedComponentDTO();
dto.setId(connectable.getIdentifier());
dto.setReferenceType(connectable.getConnectableType().name());
dto.setProcessGroupId(connectable.getProcessGroupIdentifier());
dto.setState(connectable.getScheduledState().name());
entity.setComponent(dto);
return entity;
}
private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
@ -3720,24 +3796,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entity;
}
private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) {
final AffectedComponentEntity entity = new AffectedComponentEntity();
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
entity.setId(instance.getInstanceId());
final String componentTypeName = instance.getType().name();
final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
entity.setPermissions(permissionsDto);
final AffectedComponentDTO dto = new AffectedComponentDTO();
dto.setId(instance.getInstanceId());
dto.setReferenceType(componentTypeName);
dto.setProcessGroupId(instance.getInstanceGroupId());
dto.setState(getComponentState(instance));
entity.setComponent(dto);
return entity;
private Authorizable getAuthorizable(final Connectable connectable) {
switch (connectable.getConnectableType()) {
case REMOTE_INPUT_PORT:
case REMOTE_OUTPUT_PORT:
final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
return authorizableLookup.getRemoteProcessGroup(rpgId);
default:
return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
}
}
private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
@ -3820,7 +3888,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates();
final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryUrl = remoteCoordinates.getRegistryUrl();
@ -3868,17 +3936,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified);
return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true);
}
@Override
public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
processGroupNode,
() -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified),
() -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings),
processGroup -> dtoFactory.createProcessGroupDto(processGroup));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
@ -4243,27 +4311,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
this.leaderElectionManager = leaderElectionManager;
}
public void setRegistryDAO(RegistryDAO registryDao) {
this.registryDAO = registryDao;
}
public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
this.flowRegistryClient = flowRegistryClient;
// temp code to load the registry client cache
final Set<String> registryIdentifiers = flowRegistryClient.getRegistryIdentifiers();
if (registryIdentifiers != null) {
for (final String registryIdentifier : registryIdentifiers) {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryIdentifier);
final RegistryDTO registry = new RegistryDTO();
registry.setId(registryIdentifier);
registry.setName(flowRegistry.getName());
registry.setUri(flowRegistry.getURL());
registry.setDescription("Default client for storing Flow Revisions to the local disk.");
final RegistryEntity registryEntity = new RegistryEntity();
registryEntity.setComponent(registry);
registryCache.put(registryIdentifier, new Tuple(new Revision(0L, null, registryIdentifier), registry));
}
}
}
}

View File

@ -16,12 +16,57 @@
*/
package org.apache.nifi.web.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@ -42,6 +87,8 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@ -64,6 +111,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
@ -104,55 +152,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
@ -826,7 +831,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
@ -983,7 +988,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
@ -1513,9 +1518,10 @@ public class ProcessGroupResource extends ApplicationResource {
* Adds the specified process group.
*
* @param httpServletRequest request
* @param groupId The group id
* @param groupId The group id
* @param requestProcessGroupEntity A processGroupEntity
* @return A processGroupEntity
* @throws IOException if the request indicates that the Process Group should be imported from a Flow Registry and NiFi is unable to communicate with the Flow Registry
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@ -1547,7 +1553,7 @@ public class ProcessGroupResource extends ApplicationResource {
@ApiParam(
value = "The process group configuration details.",
required = true
) final ProcessGroupEntity requestProcessGroupEntity) {
) final ProcessGroupEntity requestProcessGroupEntity) throws IOException {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
@ -1574,6 +1580,28 @@ public class ProcessGroupResource extends ApplicationResource {
}
requestProcessGroupEntity.getComponent().setParentGroupId(groupId);
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
// Step 3: Resolve Bundle info
// Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
// Step 5: If any of the components is a Restricted Component, then we must authorize the user
// for write access to the RestrictedComponents resource
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
if (versionControlInfo != null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
// Step 3: Resolve Bundle info
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
}
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);
}
@ -1584,8 +1612,23 @@ public class ProcessGroupResource extends ApplicationResource {
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// Step 5: If any of the components is a Restricted Component, then we must authorize the user
// for write access to the RestrictedComponents resource
final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
final boolean containsRestrictedComponent = FlowRegistryUtils.containsRestrictedComponent(versionedFlowSnapshot.getFlowContents());
if (containsRestrictedComponent) {
lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
}
}
},
() -> {
final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
}
},
null,
processGroupGroupEntity -> {
// set the processor id as appropriate
processGroupGroupEntity.getComponent().setId(generateUuid());
@ -1593,6 +1636,16 @@ public class ProcessGroupResource extends ApplicationResource {
// create the process group contents
final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId());
final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (flowSnapshot != null) {
final RevisionDTO revisionDto = entity.getRevision();
final String newGroupId = entity.getComponent().getId();
final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
}
populateRemainingProcessGroupEntityContent(entity);
// generate a 201 created response

View File

@ -17,12 +17,39 @@
package org.apache.nifi.web.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.Authorizer;
@ -30,12 +57,11 @@ import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
@ -48,7 +74,6 @@ import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager;
import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
@ -71,36 +96,12 @@ import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
@Path("/versions")
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
@ -163,7 +164,7 @@ public class VersionsResource extends ApplicationResource {
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("start-requests")
@ApiOperation(
@ -402,10 +403,10 @@ public class VersionsResource extends ApplicationResource {
final NodeResponse clusterResponse;
try {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse();
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse();
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@ -466,10 +467,10 @@ public class VersionsResource extends ApplicationResource {
final NodeResponse clusterResponse;
try {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse();
clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse();
getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@ -942,7 +943,7 @@ public class VersionsResource extends ApplicationResource {
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
discoverCompatibleBundles(flowSnapshot.getFlowContents());
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
@ -956,6 +957,12 @@ public class VersionsResource extends ApplicationResource {
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component affected.
authorizeAffectedComponents(lookup, affectedComponents);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
if (containsRestrictedComponents) {
lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
}
},
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
@ -1070,7 +1077,7 @@ public class VersionsResource extends ApplicationResource {
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
discoverCompatibleBundles(flowSnapshot.getFlowContents());
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
@ -1084,6 +1091,12 @@ public class VersionsResource extends ApplicationResource {
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component affected.
authorizeAffectedComponents(lookup, affectedComponents);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
if (containsRestrictedComponents) {
lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
}
},
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
@ -1258,7 +1271,7 @@ public class VersionsResource extends ApplicationResource {
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation();
serviceFacade.updateProcessGroup(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified);
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
}
asyncRequest.setLastUpdated(new Date());
@ -1384,50 +1397,6 @@ public class VersionsResource extends ApplicationResource {
this.dtoFactory = dtoFactory;
}
private BundleDTO createBundleDto(final Bundle bundle) {
final BundleDTO dto = new BundleDTO();
dto.setArtifact(bundle.getArtifact());
dto.setGroup(dto.getGroup());
dto.setVersion(dto.getVersion());
return dto;
}
/**
* Discovers the compatible bundle details for the components in the specified snippet.
*
* @param versionedGroup the versioned group
*/
private void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) {
if (versionedGroup.getProcessors() != null) {
versionedGroup.getProcessors().forEach(processor -> {
final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle()));
final Bundle bundle = new Bundle();
bundle.setArtifact(coordinate.getId());
bundle.setGroup(coordinate.getGroup());
bundle.setVersion(coordinate.getVersion());
processor.setBundle(bundle);
});
}
if (versionedGroup.getControllerServices() != null) {
versionedGroup.getControllerServices().forEach(controllerService -> {
final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle()));
final Bundle bundle = new Bundle();
bundle.setArtifact(coordinate.getId());
bundle.setGroup(coordinate.getGroup());
bundle.setVersion(coordinate.getVersion());
controllerService.setBundle(bundle);
});
}
if (versionedGroup.getProcessGroups() != null) {
versionedGroup.getProcessGroups().forEach(processGroup -> {
discoverCompatibleBundles(processGroup);
});
}
}
private static class ActiveRequest {
private static final long MAX_REQUEST_LOCK_NANOS = TimeUnit.MINUTES.toNanos(1L);

View File

@ -113,6 +113,7 @@ import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
@ -3713,6 +3714,15 @@ public final class DtoFactory {
return nodeDto;
}
public RegistryDTO createRegistryDto(FlowRegistry registry) {
final RegistryDTO dto = new RegistryDTO();
dto.setDescription(registry.getDescription());
dto.setId(registry.getIdentifier());
dto.setName(registry.getName());
dto.setUri(registry.getURL());
return dto;
}
/* setters */
public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) {

View File

@ -82,6 +82,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.ReportingTask;
@ -1643,6 +1644,9 @@ public class ControllerFacade implements Authorizable {
return dto;
}
public void verifyComponentTypes(VersionedProcessGroup versionedFlow) {
flowController.verifyComponentTypesInSnippet(versionedFlow);
}
private ComponentSearchResultDTO search(final String searchStr, final ProcessorNode procNode) {

View File

@ -113,10 +113,11 @@ public interface ProcessGroupDAO {
* @param proposedSnapshot Flow the new version of the flow
* @param versionControlInformation the new Version Control Information
* @param componentIdSeed the seed value to use for generating ID's for new components
* @param updateSettings whether or not to update the process group's name and position
* @return the process group
*/
ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
boolean verifyNotModified);
boolean verifyNotModified, boolean updateSettings);
/**
* Applies the given Version Control Information to the Process Group

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao;
import java.util.Set;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.web.api.dto.RegistryDTO;
public interface RegistryDAO {
FlowRegistry createFlowRegistry(RegistryDTO registryDto);
FlowRegistry getFlowRegistry(String registryId);
Set<FlowRegistry> getFlowRegistries();
FlowRegistry removeFlowRegistry(String registryId);
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.RegistryDTO;
import org.apache.nifi.web.dao.RegistryDAO;
public class FlowRegistryDAO implements RegistryDAO {
private FlowRegistryClient flowRegistryClient;
@Override
public FlowRegistry createFlowRegistry(final RegistryDTO registryDto) {
return flowRegistryClient.addFlowRegistry(registryDto.getId(), registryDto.getName(), registryDto.getUri(), registryDto.getDescription());
}
@Override
public FlowRegistry getFlowRegistry(final String registryId) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("Unable to find Flow Registry with id '" + registryId + "'");
}
return registry;
}
@Override
public Set<FlowRegistry> getFlowRegistries() {
return flowRegistryClient.getRegistryIdentifiers().stream()
.map(flowRegistryClient::getFlowRegistry)
.collect(Collectors.toSet());
}
@Override
public FlowRegistry removeFlowRegistry(final String registryId) {
final FlowRegistry registry = flowRegistryClient.removeFlowRegistry(registryId);
if (registry == null) {
throw new ResourceNotFoundException("Unable to find Flow Registry with id '" + registryId + "'");
}
return registry;
}
public void setFlowRegistryClient(FlowRegistryClient client) {
this.flowRegistryClient = client;
}
}

View File

@ -246,6 +246,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
return group;
}
@Override
public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.disconnectVersionControl();
@ -254,9 +255,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
final String componentIdSeed, final boolean verifyNotModified) {
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
final StandardVersionControlInformation svci = new StandardVersionControlInformation(
versionControlInformation.getRegistryId(),

View File

@ -125,6 +125,9 @@
<property name="flowController" ref="flowController"/>
<property name="snippetUtils" ref="snippetUtils"/>
</bean>
<bean id="flowRegistryDAO" class="org.apache.nifi.web.dao.impl.FlowRegistryDAO">
<property name="flowRegistryClient" ref="flowRegistryClient" />
</bean>
<bean id="policyBasedAuthorizerDAO" class="org.apache.nifi.web.dao.impl.StandardPolicyBasedAuthorizerDAO">
<constructor-arg ref="authorizer"/>
</bean>
@ -182,6 +185,7 @@
<property name="bulletinRepository" ref="bulletinRepository"/>
<property name="leaderElectionManager" ref="leaderElectionManager" />
<property name="flowRegistryClient" ref="flowRegistryClient" />
<property name="registryDAO" ref="flowRegistryDAO" />
</bean>
<!-- component ui extension configuration context -->