diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
index 83af9d8fa6..1e2a4b4854 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.web.api.entity;
-import io.swagger.annotations.ApiModelProperty;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import javax.xml.bind.annotation.XmlRootElement;
+import io.swagger.annotations.ApiModelProperty;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO.
@@ -30,6 +32,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible
true,
* and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
* throw an IllegalStateException
+ * @param updateSettings whether or not to update the process group's name and positions
*/
- void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
+ void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
/**
* Verifies a template with the specified name can be created.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 962a9406eb..4efff943d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -24,17 +24,47 @@ import java.io.IOException;
import java.util.Set;
public interface FlowRegistry {
+ /**
+ * @return the ID of the Flow Registry
+ */
+ String getIdentifier();
+
+ /**
+ * @return the description of the Flow Registry
+ */
+ String getDescription();
+
+ /**
+ * Updates the Flow Registry's description
+ *
+ * @param description the description of the Flow Registry
+ */
+ void setDescription(String description);
/**
* @return the URL of the Flow Registry
*/
String getURL();
+ /**
+ * Updates the Flow Registry's URL
+ *
+ * @param url the URL of the Flow Registry
+ */
+ void setURL(String url);
+
/**
* @return the name of the Flow Registry
*/
String getName();
+ /**
+ * Updates the name of the Flow Registry
+ *
+ * @param name the name of the Flow Registry
+ */
+ void setName(String name);
+
/**
* Gets the buckets for the specified user.
*
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
index 83f66dc7db..77c2761404 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -34,4 +34,10 @@ public interface FlowRegistryClient {
}
Set getRegistryIdentifiers();
+
+ void addFlowRegistry(FlowRegistry registry);
+
+ FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description);
+
+ FlowRegistry removeFlowRegistry(String registryId);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 242ef6af6c..5ed5b6e378 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -165,6 +165,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -2128,6 +2130,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set supportedBundles) {
+ final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+ if (!supportedBundles.contains(requiredCoordinate)) {
+ throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+ }
+ }
+
private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map> supportedTypes) {
if (templateContents.getProcessors() != null) {
templateContents.getProcessors().forEach(processor -> {
@@ -2150,6 +2159,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map> supportedTypes) {
+ if (versionedFlow.getProcessors() != null) {
+ versionedFlow.getProcessors().forEach(processor -> {
+ if (processor.getBundle() == null) {
+ throw new IllegalArgumentException("Processor bundle must be specified.");
+ }
+
+ if (supportedTypes.containsKey(processor.getType())) {
+ verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
+ }
+ });
+ }
+
+ if (versionedFlow.getProcessGroups() != null) {
+ versionedFlow.getProcessGroups().forEach(processGroup -> {
+ verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
+ });
+ }
+ }
+
private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map> supportedTypes) {
if (templateContents.getControllerServices() != null) {
templateContents.getControllerServices().forEach(controllerService -> {
@@ -2172,6 +2203,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map> supportedTypes) {
+ if (versionedFlow.getControllerServices() != null) {
+ versionedFlow.getControllerServices().forEach(controllerService -> {
+ if (supportedTypes.containsKey(controllerService.getType())) {
+ if (controllerService.getBundle() == null) {
+ throw new IllegalArgumentException("Controller Service bundle must be specified.");
+ }
+
+ verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
+ }
+ });
+ }
+
+ if (versionedFlow.getProcessGroups() != null) {
+ versionedFlow.getProcessGroups().forEach(processGroup -> {
+ verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
+ });
+ }
+ }
+
public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
final Map> processorClasses = new HashMap<>();
for (final Class> c : ExtensionManager.getExtensions(Processor.class)) {
@@ -2210,6 +2263,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
+ final Map> processorClasses = new HashMap<>();
+ for (final Class> c : ExtensionManager.getExtensions(Processor.class)) {
+ final String name = c.getName();
+ processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ }
+ verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
+
+ final Map> controllerServiceClasses = new HashMap<>();
+ for (final Class> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ final String name = c.getName();
+ controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ }
+ verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
+
+ final Set prioritizerClasses = new HashSet<>();
+ for (final Class> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+ prioritizerClasses.add(c.getName());
+ }
+
+ final Set allConns = new HashSet<>();
+ allConns.addAll(versionedFlow.getConnections());
+ for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
+ allConns.addAll(findAllConnections(childGroup));
+ }
+
+ for (final VersionedConnection conn : allConns) {
+ final List prioritizers = conn.getPrioritizers();
+ if (prioritizers != null) {
+ for (final String prioritizer : prioritizers) {
+ if (!prioritizerClasses.contains(prioritizer)) {
+ throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
+ }
+ }
+ }
+ }
+ }
+
/**
*
* Verifies that the given DTO is valid, according to the following:
@@ -2270,6 +2361,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return conns;
}
+ private Set findAllConnections(final VersionedProcessGroup group) {
+ final Set conns = new HashSet<>();
+ for (final VersionedConnection connection : group.getConnections()) {
+ conns.add(connection);
+ }
+
+ for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+ conns.addAll(findAllConnections(childGroup));
+ }
+ return conns;
+ }
+
//
// Processor access
//
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index e879e383b7..5a7aeec801 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -85,6 +85,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -184,7 +185,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
try {
if (flowAlreadySynchronized) {
existingFlow = toBytes(controller);
- existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty();
+ existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty()
+ && controller.getAllReportingTasks().isEmpty()
+ && controller.getAllControllerServices().isEmpty()
+ && controller.getFlowRegistryClient().getRegistryIdentifiers().isEmpty();
} else {
existingFlow = readFlowFromDisk();
if (existingFlow == null || existingFlow.length == 0) {
@@ -220,10 +224,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
}
+ final boolean registriesPresent;
+ final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+ if (registriesElement == null) {
+ registriesPresent = false;
+ } else {
+ final List flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ registriesPresent = !flowRegistryElems.isEmpty();
+ }
+
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
- existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
+ existingFlowEmpty = taskElements.isEmpty()
+ && unrootedControllerServiceElements.isEmpty()
+ && isEmpty(rootGroupDto)
+ && registriesPresent;
logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
}
}
@@ -318,6 +334,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the root group XML element
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
+ if (!flowAlreadySynchronized || existingFlowEmpty) {
+ final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+ if (registriesElement != null) {
+ final List flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ for (final Element flowRegistryElement : flowRegistryElems) {
+ final String registryId = getString(flowRegistryElement, "id");
+ final String registryName = getString(flowRegistryElement, "name");
+ final String registryUrl = getString(flowRegistryElement, "url");
+ final String description = getString(flowRegistryElement, "description");
+
+ final FlowRegistryClient client = controller.getFlowRegistryClient();
+ client.addFlowRegistry(registryId, registryName, registryUrl, description);
+ }
+ }
+ }
+
// if this controller isn't initialized or its empty, add the root group, otherwise update
final ProcessGroup rootGroup;
if (!flowAlreadySynchronized || existingFlowEmpty) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index ecf24383d2..f921bc6dbd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -39,6 +39,8 @@ import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
@@ -74,7 +76,7 @@ import java.util.concurrent.TimeUnit;
*/
public class StandardFlowSerializer implements FlowSerializer {
- private static final String MAX_ENCODING_VERSION = "1.2";
+ private static final String MAX_ENCODING_VERSION = "1.3";
private final StringEncryptor encryptor;
@@ -98,6 +100,11 @@ public class StandardFlowSerializer implements FlowSerializer {
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
+
+ final Element registriesElement = doc.createElement("registries");
+ rootNode.appendChild(registriesElement);
+
+ addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
// Add root-level controller services
@@ -130,6 +137,26 @@ public class StandardFlowSerializer implements FlowSerializer {
}
}
+ private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
+ for (final String registryId : registryClient.getRegistryIdentifiers()) {
+ final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);
+
+ final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
+ parentElement.appendChild(registryElement);
+
+ addStringElement(registryElement, "id", flowRegistry.getIdentifier());
+ addStringElement(registryElement, "name", flowRegistry.getName());
+ addStringElement(registryElement, "url", flowRegistry.getURL());
+ addStringElement(registryElement, "description", flowRegistry.getDescription());
+ }
+ }
+
+ private void addStringElement(final Element parentElement, final String elementName, final String value) {
+ final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
+ childElement.setTextContent(value);
+ parentElement.appendChild(childElement);
+ }
+
private void addSize(final Element parentElement, final Size size) {
final Element element = parentElement.getOwnerDocument().createElement("size");
element.setAttribute("width", String.valueOf(size.getWidth()));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 3aa5084030..e1846a08d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.stream.Stream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
@@ -198,6 +199,21 @@ public class FingerprintFactory {
}
private StringBuilder addFlowControllerFingerprint(final StringBuilder builder, final Element flowControllerElem, final FlowController controller) {
+ // registries
+ final Element registriesElement = DomUtils.getChild(flowControllerElem, "registries");
+ if (registriesElement == null) {
+ builder.append("NO_VALUE");
+ } else {
+ final List flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ if (flowRegistryElems.isEmpty()) {
+ builder.append("NO_VALUE");
+ } else {
+ for (final Element flowRegistryElement : flowRegistryElems) {
+ addFlowRegistryFingerprint(builder, flowRegistryElement);
+ }
+ }
+ }
+
// root group
final Element rootGroupElem = (Element) DomUtils.getChildNodesByTagName(flowControllerElem, "rootGroup").item(0);
addProcessGroupFingerprint(builder, rootGroupElem, controller);
@@ -265,6 +281,11 @@ public class FingerprintFactory {
return builder;
}
+ private StringBuilder addFlowRegistryFingerprint(final StringBuilder builder, final Element flowRegistryElement) {
+ Stream.of("id", "name", "url", "description").forEach(elementName -> appendFirstValue(builder, DomUtils.getChildNodesByTagName(flowRegistryElement, elementName)));
+ return builder;
+ }
+
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3b8117bb06..1d8652e15a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -72,7 +72,7 @@ import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.UnknownResourceException;
import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -2835,11 +2835,14 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+ @Override
public void disconnectVersionControl() {
writeLock.lock();
try {
- // TODO remove version component ids from each component (until another versioned PG is encountered)
this.versionControlInfo.set(null);
+
+ // remove version component ids from each component (until another versioned PG is encountered)
+ applyVersionedComponentIds(this, id -> null);
} finally {
writeLock.unlock();
}
@@ -2850,36 +2853,41 @@ public final class StandardProcessGroup implements ProcessGroup {
return;
}
- processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
+ applyVersionedComponentIds(processGroup, versionedComponentIds::get);
+ }
+
+ private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function lookup) {
+ processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
processGroup.getConnections().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getProcessors().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getInputPorts().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getOutputPorts().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getLabels().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getFunnels().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getControllerServices(false).stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getRemoteProcessGroups().stream()
.forEach(rpg -> {
- rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
+ rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
rpg.getInputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+ .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
rpg.getOutputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+ .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
processGroup.getProcessGroups().stream()
- .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds));
+ .filter(childGroup -> childGroup.getVersionControlInformation() != null)
+ .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
@@ -2931,10 +2939,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
- public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) {
+ public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
writeLock.lock();
try {
- verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also
+ verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
@@ -2950,15 +2958,15 @@ public final class StandardProcessGroup implements ProcessGroup {
.map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
.collect(Collectors.toSet());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
- } else {
- // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient
- // than having to remember to enable DEBUG level logging every time a full build is done.
- LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
+ if (LOG.isInfoEnabled()) {
+ final String differencesByLine = flowComparison.getDifferences().stream()
+ .map(FlowDifference::toString)
+ .collect(Collectors.joining("\n"));
+
+ LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
}
- updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false);
+ updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} finally {
@@ -2968,10 +2976,14 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
- final Set updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException {
+ final Set updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
group.setComments(proposed.getComments());
- group.setName(proposed.getName());
+
+ if (updateName) {
+ group.setName(proposed.getName());
+ }
+
if (updatePosition && proposed.getPosition() != null) {
group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
}
@@ -2998,7 +3010,7 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVariables(updatedVariableMap);
- final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates();
+ final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
final String bucketId = remoteCoordinates.getBucketId();
@@ -3022,7 +3034,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed);
LOG.info("Added {} to {}", added, this);
} else {
- updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true);
+ updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
LOG.info("Updated {}", childGroup);
}
@@ -3136,14 +3148,29 @@ public final class StandardProcessGroup implements ProcessGroup {
final Map processorsByVersionedId = group.getProcessors().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
+ final Map> autoTerminatedRelationships = new HashMap<>();
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed);
+
+ final Set proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+ .map(relName -> added.getRelationship(relName))
+ .collect(Collectors.toSet());
+ autoTerminatedRelationships.put(added, proposedAutoTerminated);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor);
+
+ final Set proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+ .map(relName -> processor.getRelationship(relName))
+ .collect(Collectors.toSet());
+
+ if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
+ autoTerminatedRelationships.put(processor, proposedAutoTerminated);
+ }
+
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@@ -3205,6 +3232,13 @@ public final class StandardProcessGroup implements ProcessGroup {
group.removeConnection(connection);
}
+ // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
+ // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
+ // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
+ // Connection for that relationship exists. This will throw an Exception.
+ autoTerminatedRelationships.forEach((proc, rels) -> proc.setAutoTerminatedRelationships(rels));
+
+ // Remove all controller services no longer in use
for (final String removedVersionedId : controllerServicesRemoved) {
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", service, group);
@@ -3276,7 +3310,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier());
addProcessGroup(group);
- updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true);
+ updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
return group;
}
@@ -3535,10 +3569,6 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setYieldPeriod(proposed.getYieldDuration());
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
- processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
- .map(relName -> processor.getRelationship(relName))
- .collect(Collectors.toSet()));
-
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List descriptors = new ArrayList<>(processor.getProperties().keySet());
@@ -3547,6 +3577,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+
private Map populatePropertiesMap(final Map currentProperties, final Map proposedProperties) {
final Map fullPropertyMap = new HashMap<>();
for (final PropertyDescriptor property : currentProperties.keySet()) {
@@ -3646,7 +3677,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public String getName() {
- return "Flow Under Version Control";
+ return "Versioned Flow";
}
};
@@ -3659,7 +3690,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.findAny()
.isPresent();
- LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences);
+ LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences);
return Optional.of(modified);
}
@@ -3669,27 +3700,24 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.lock();
try {
final VersionControlInformation versionControlInfo = getVersionControlInformation();
- if (versionControlInfo == null) {
- throw new IllegalStateException("Cannot update the Version of the flow for " + this
- + " because the Process Group is not currently under Version Control");
- }
-
- if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
- throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
- }
-
- if (verifyNotDirty) {
- final Optional modifiedOption = versionControlInfo.getModified();
- if (!modifiedOption.isPresent()) {
- throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
- + "has not yet been synchronized with the Flow Registry. The Process Group must be"
- + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+ if (versionControlInfo != null) {
+ if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
+ throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}
- if (Boolean.TRUE.equals(modifiedOption.get())) {
- throw new IllegalStateException("Cannot change the Version of the flow for " + this
- + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
- + " restored to its original form before changing the version");
+ if (verifyNotDirty) {
+ final Optional modifiedOption = versionControlInfo.getModified();
+ if (!modifiedOption.isPresent()) {
+ throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+ + "has not yet been synchronized with the Flow Registry. The Process Group must be"
+ + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+ }
+
+ if (Boolean.TRUE.equals(modifiedOption.get())) {
+ throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+ + " restored to its original form before changing the version");
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
similarity index 89%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
index 2cc39c62d9..da5880cc52 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
@@ -17,22 +17,13 @@
package org.apache.nifi.registry.flow;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.registry.bucket.Bucket;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collections;
+import java.net.URI;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,22 +35,44 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* A simple file-based implementation of a Flow Registry Client. Rather than interacting
* with an actual Flow Registry, this implementation simply reads flows from disk and writes
* them to disk. It is not meant for any production use but is available for testing purposes.
*/
-public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry {
+public class FileBasedFlowRegistry implements FlowRegistry {
private final File directory;
private final Map> flowNamesByBucket = new HashMap<>();
private final JsonFactory jsonFactory = new JsonFactory();
+ private final String id;
+ private volatile String name = "Local Registry";
+ private volatile String url = "file:" + (new File("..").getAbsolutePath());
+ private volatile String description = "Default file-based Flow Registry";
+
+ public FileBasedFlowRegistry(final String id, final String url) throws IOException {
+ final URI uri = URI.create(url);
+ if (!uri.getScheme().equalsIgnoreCase("file")) {
+ throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
+ }
+
+ this.directory = new File(URI.create(url).getPath());
- public FileBasedFlowRegistryClient(final File directory) throws IOException {
if (!directory.exists() && !directory.mkdirs()) {
throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
}
- this.directory = directory;
+ this.id = id;
+ this.url = url;
recoverBuckets();
}
@@ -101,23 +114,14 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
}
}
- @Override
- public FlowRegistry getFlowRegistry(final String registryId) {
- if (!"default".equals(registryId)) {
- return null;
- }
-
- return this;
- }
-
@Override
public String getURL() {
- return directory.toURI().toString();
+ return url;
}
@Override
public String getName() {
- return "Local Registry";
+ return name;
}
@Override
@@ -138,12 +142,29 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
bucket.setName("Bucket '" + bucketIdentifier + "'");
bucket.setCreatedTimestamp(creation);
+ final Set versionedFlows = new HashSet<>();
+ final File[] flowDirs = bucketDirectory.listFiles();
+ if (flowDirs != null) {
+ for (final File flowDir : flowDirs) {
+ final String flowIdentifier = flowDir.getName();
+ try {
+ final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
+ versionedFlows.add(versionedFlow);
+ } catch (UnknownResourceException e) {
+ continue;
+ }
+ }
+ }
+
+ bucket.setVersionedFlows(versionedFlows);
+
buckets.add(bucket);
}
return buckets;
}
+
@Override
public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
Objects.requireNonNull(flow);
@@ -241,7 +262,7 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
final File contentsFile = new File(snapshotDir, "flow.xml");
try (final OutputStream out = new FileOutputStream(contentsFile);
- final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) {
+ final JsonGenerator generator = jsonFactory.createGenerator(out)) {
generator.setCodec(new ObjectMapper());
generator.setPrettyPrinter(new DefaultPrettyPrinter());
generator.writeObject(snapshot);
@@ -269,11 +290,6 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
return response;
}
- @Override
- public Set getRegistryIdentifiers() {
- return Collections.singleton("default");
- }
-
@Override
public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
// Verify that the bucket exists
@@ -400,6 +416,8 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
flow.setSnapshotMetadata(snapshotMetadataSet);
final File[] versionDirs = flowDir.listFiles();
+ flow.setVersionCount(versionDirs.length);
+
for (final File file : versionDirs) {
if (!file.isDirectory()) {
continue;
@@ -432,4 +450,29 @@ public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegi
return flow;
}
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public void setURL(String url) {
+ this.url = url;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
new file mode 100644
index 0000000000..828b970d7a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class StandardFlowRegistryClient implements FlowRegistryClient {
+ private ConcurrentMap registryById = new ConcurrentHashMap<>();
+
+ @Override
+ public FlowRegistry getFlowRegistry(String registryId) {
+ return registryById.get(registryId);
+ }
+
+ @Override
+ public Set getRegistryIdentifiers() {
+ return registryById.keySet();
+ }
+
+ @Override
+ public void addFlowRegistry(final FlowRegistry registry) {
+ final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry);
+ if (existing != null) {
+ throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier());
+ }
+ }
+
+ @Override
+ public FlowRegistry addFlowRegistry(final String registryId, final String registryName, final String registryUrl, final String description) {
+ final URI uri = URI.create(registryUrl);
+ final String uriScheme = uri.getScheme();
+
+ final FlowRegistry registry;
+ if (uriScheme.equalsIgnoreCase("file")) {
+ try {
+ registry = new FileBasedFlowRegistry(registryId, registryUrl);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl, e);
+ }
+
+ registry.setName(registryName);
+ registry.setDescription(description);
+ } else {
+ throw new IllegalArgumentException("Cannot create Flow Registry with URI of " + registryUrl
+ + " because there are no known implementations of Flow Registries that can handle URIs of scheme " + uriScheme);
+ }
+
+ addFlowRegistry(registry);
+ return registry;
+ }
+
+ @Override
+ public FlowRegistry removeFlowRegistry(final String registryId) {
+ return registryById.remove(registryId);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index e3edc30f93..a75d112250 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -28,10 +28,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@@ -46,6 +48,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
@@ -56,13 +59,14 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.Position;
-import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
@@ -78,9 +82,91 @@ public class NiFiRegistryFlowMapper {
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) {
versionedComponentIds.clear();
- return mapGroup(group, registryClient, true);
+ final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true);
+
+ // TODO: Test that this works properly
+ populateReferencedAncestorServices(group, mapped);
+
+ // TODO: Test that this works properly
+ populateReferencedAncestorVariables(group, mapped);
+
+ return mapped;
}
+ private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
+ final Set ancestorControllerServices = group.getControllerServices(true);
+ ancestorControllerServices.remove(group.getControllerServices(false));
+ final Map ancestorServicesById = ancestorControllerServices.stream()
+ .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity()));
+
+ final Set referenced = new HashSet<>();
+
+ for (final ProcessorNode processor : group.findAllProcessors()) {
+ findReferencedServices(processor, ancestorServicesById, referenced);
+ }
+
+ for (final ControllerServiceNode service : group.findAllControllerServices()) {
+ findReferencedServices(service, ancestorServicesById, referenced);
+ }
+
+ final Set versionedServices = referenced.stream().map(this::mapControllerService)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+
+ versionedGroup.getControllerServices().addAll(versionedServices);
+ }
+
+ private Set findReferencedServices(final ConfiguredComponent component, final Map ancestorServicesById,
+ final Set referenced) {
+
+ for (final Map.Entry entry : component.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null) {
+ final String serviceId = entry.getValue();
+ final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId);
+ if (serviceNode != null) {
+ referenced.add(serviceNode);
+ referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced));
+ }
+ }
+ }
+
+ return referenced;
+ }
+
+ private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
+ final Set ancestorVariableNames = new HashSet<>();
+ populateVariableNames(group.getParent(), ancestorVariableNames);
+
+ final Map implicitlyDefinedVariables = new HashMap<>();
+ for (final String variableName : ancestorVariableNames) {
+ final boolean isReferenced = !group.getComponentsAffectedByVariable(variableName).isEmpty();
+ if (isReferenced) {
+ final String value = group.getVariableRegistry().getVariableValue(variableName);
+ implicitlyDefinedVariables.put(variableName, value);
+ }
+ }
+
+ if (!implicitlyDefinedVariables.isEmpty()) {
+ // Merge the implicit variables with the explicitly defined variables for the Process Group
+ // and set those as the Versioned Group's variables.
+ implicitlyDefinedVariables.putAll(versionedGroup.getVariables());
+ versionedGroup.setVariables(implicitlyDefinedVariables);
+ }
+ }
+
+ private void populateVariableNames(final ProcessGroup group, final Set variableNames) {
+ if (group == null) {
+ return;
+ }
+
+ group.getVariableRegistry().getVariableMap().keySet().stream()
+ .map(VariableDescriptor::getName)
+ .forEach(variableNames::add);
+
+ populateVariableNames(group.getParent(), variableNames);
+ }
+
+
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
@@ -95,7 +181,7 @@ public class NiFiRegistryFlowMapper {
if (!topLevel) {
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {
- final RemoteFlowCoordinates coordinates = new RemoteFlowCoordinates();
+ final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
final String registryId = versionControlInfo.getRegistryIdentifier();
final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
if (registry == null) {
@@ -237,6 +323,7 @@ public class NiFiRegistryFlowMapper {
private Map mapProperties(final ConfiguredComponent component) {
final Map mapped = new HashMap<>();
component.getProperties().keySet().stream()
+ .filter(property -> !property.isSensitive())
.forEach(property -> {
String value = component.getProperty(property);
if (value == null) {
@@ -312,7 +399,7 @@ public class NiFiRegistryFlowMapper {
versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks());
versionedPort.setName(port.getName());
versionedPort.setPosition(mapPosition(port.getPosition()));
- versionedPort.setType(PortType.valueOf(port.getComponentType()));
+ versionedPort.setType(PortType.valueOf(port.getConnectableType().name()));
return versionedPort;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
index eda045a6b1..807691f79b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java
@@ -19,6 +19,7 @@ package org.apache.nifi.util;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.api.dto.BundleDTO;
import java.util.List;
@@ -28,7 +29,6 @@ import java.util.stream.Collectors;
* Utility class for Bundles.
*/
public final class BundleUtils {
-
private static BundleCoordinate findBundleForType(final String type, final BundleCoordinate desiredCoordinate) {
final List bundles = ExtensionManager.getBundles(type);
if (bundles.isEmpty()) {
@@ -140,4 +140,50 @@ public final class BundleUtils {
}
}
+
+ /**
+ * Discovers the compatible bundle details for the components in the specified Versioned Process Group and updates the Versioned Process Group
+ * to reflect the appropriate bundles.
+ *
+ * @param versionedGroup the versioned group
+ */
+ public static void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) {
+ if (versionedGroup.getProcessors() != null) {
+ versionedGroup.getProcessors().forEach(processor -> {
+ final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle()));
+
+ final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle();
+ bundle.setArtifact(coordinate.getId());
+ bundle.setGroup(coordinate.getGroup());
+ bundle.setVersion(coordinate.getVersion());
+ processor.setBundle(bundle);
+ });
+ }
+
+ if (versionedGroup.getControllerServices() != null) {
+ versionedGroup.getControllerServices().forEach(controllerService -> {
+ final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle()));
+
+ final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle();
+ bundle.setArtifact(coordinate.getId());
+ bundle.setGroup(coordinate.getGroup());
+ bundle.setVersion(coordinate.getVersion());
+ controllerService.setBundle(bundle);
+ });
+ }
+
+ if (versionedGroup.getProcessGroups() != null) {
+ versionedGroup.getProcessGroups().forEach(processGroup -> {
+ discoverCompatibleBundles(processGroup);
+ });
+ }
+ }
+
+ public static BundleDTO createBundleDto(final org.apache.nifi.registry.flow.Bundle bundle) {
+ final BundleDTO dto = new BundleDTO();
+ dto.setArtifact(bundle.getArtifact());
+ dto.setGroup(dto.getGroup());
+ dto.setVersion(dto.getVersion());
+ return dto;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 8186c8bd59..8954f399fc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -26,6 +26,8 @@
+
+
@@ -38,6 +40,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index 6a3ec8ba86..fc42c62161 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -36,9 +36,7 @@
-
-
-
+
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 18dc51b5c8..db4ac590d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -654,7 +654,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) {
+ public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) {
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
index ca3e95f8cc..31f1fbe2cb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
@@ -32,6 +32,8 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
+import java.util.Optional;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position;
@@ -208,9 +210,11 @@ public class FingerprintFactoryTest {
when(component.getProxyPort()).thenReturn(null);
when(component.getProxyUser()).thenReturn(null);
when(component.getProxyPassword()).thenReturn(null);
+ when(component.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "id" +
+ "NO_VALUE" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"eth0" +
"10 sec" +
@@ -245,9 +249,11 @@ public class FingerprintFactoryTest {
when(component.getProxyPort()).thenReturn(3128);
when(component.getProxyUser()).thenReturn("proxy-user");
when(component.getProxyPassword()).thenReturn("proxy-pass");
+ when(component.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "id" +
+ "NO_VALUE" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"NO_VALUE" +
"10 sec" +
@@ -273,6 +279,7 @@ public class FingerprintFactoryTest {
when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3));
when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi");
when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+ when(groupComponent.getVersionedComponentId()).thenReturn(Optional.empty());
final RemoteGroupPort portComponent = mock(RemoteGroupPort.class);
when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent));
@@ -288,6 +295,7 @@ public class FingerprintFactoryTest {
when(portComponent.getBatchDuration()).thenReturn("10sec");
// Serializer doesn't serialize if a port doesn't have any connection.
when(portComponent.hasIncomingConnection()).thenReturn(true);
+ when(portComponent.getVersionedComponentId()).thenReturn(Optional.empty());
// Assert fingerprints with expected one.
final String expected = "portId" +
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
new file mode 100644
index 0000000000..b1da06ace7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.web.NiFiCoreException;
+import org.apache.nifi.web.api.dto.BundleDTO;
+
+public class FlowRegistryUtils {
+
+ public static boolean containsRestrictedComponent(final VersionedProcessGroup group) {
+ final Set> componentTypes = new HashSet<>();
+ populateComponentTypes(group, componentTypes);
+
+ for (final Tuple tuple : componentTypes) {
+ final ConfigurableComponent component = ExtensionManager.getTempComponent(tuple.getKey(), tuple.getValue());
+ if (component == null) {
+ throw new NiFiCoreException("Could not create an instance of component " + tuple.getKey() + " using bundle coordinates " + tuple.getValue());
+ }
+
+ final boolean isRestricted = component.getClass().isAnnotationPresent(Restricted.class);
+ if (isRestricted) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static void populateComponentTypes(final VersionedProcessGroup group, final Set> componentTypes) {
+ group.getProcessors().stream()
+ .map(versionedProc -> new Tuple<>(versionedProc.getType(), createBundleCoordinate(versionedProc.getBundle())))
+ .forEach(componentTypes::add);
+
+ group.getControllerServices().stream()
+ .map(versionedSvc -> new Tuple<>(versionedSvc.getType(), createBundleCoordinate(versionedSvc.getBundle())))
+ .forEach(componentTypes::add);
+
+ for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+ populateComponentTypes(childGroup, componentTypes);
+ }
+ }
+
+
+ public static BundleCoordinate createBundleCoordinate(final Bundle bundle) {
+ return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+ }
+
+ public static BundleDTO createBundleDto(final Bundle bundle) {
+ return new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index a68ad0c702..d851677b70 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -406,6 +406,13 @@ public interface NiFiServiceFacade {
*/
void verifyComponentTypes(FlowSnippetDTO snippet);
+ /**
+ * Verifies the types of components in a versioned process group
+ *
+ * @param versionedGroup the proposed process group
+ */
+ void verifyComponentTypes(VersionedProcessGroup versionedGroup);
+
/**
* Creates a new Template based off the specified snippet.
*
@@ -1385,10 +1392,11 @@ public interface NiFiServiceFacade {
* @param versionControlInfo the Version Control information
* @param snapshot the new snapshot
* @param componentIdSeed the seed to use for generating new component ID's
+ * @param updateSettings whether or not the process group's name and position should be updated
* @return the Process Group
*/
- ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
- boolean verifyNotModified);
+ ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
+ boolean verifyNotModified, boolean updateSettings);
// ----------------------------------------
// Component state methods
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 0105bf18cc..2b5b5c344f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,32 @@
*/
package org.apache.nifi.web;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@@ -58,6 +83,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@@ -85,10 +111,9 @@ import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.registry.ComponentVariableRegistry;
-import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.UnknownResourceException;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
@@ -103,20 +128,19 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
-import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@@ -239,6 +263,7 @@ import org.apache.nifi.web.dao.LabelDAO;
import org.apache.nifi.web.dao.PortDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.ProcessorDAO;
+import org.apache.nifi.web.dao.RegistryDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.apache.nifi.web.dao.SnippetDAO;
@@ -257,30 +282,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -312,6 +314,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private UserDAO userDAO;
private UserGroupDAO userGroupDAO;
private AccessPolicyDAO accessPolicyDAO;
+ private RegistryDAO registryDAO;
private ClusterCoordinator clusterCoordinator;
private HeartbeatMonitor heartbeatMonitor;
private LeaderElectionManager leaderElectionManager;
@@ -331,8 +334,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private AuthorizableLookup authorizableLookup;
- private Map> registryCache = new HashMap<>();
-
// -----------------------------------------
// Synchronization methods
// -----------------------------------------
@@ -1848,6 +1849,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
templateDAO.verifyComponentTypes(snippet);
}
+ @Override
+ public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
+ controllerFacade.verifyComponentTypes(versionedGroup);
+ }
+
@Override
public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) {
// get the specified snippet
@@ -2260,44 +2266,101 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createControllerServiceEntity(snapshot, null, permissions, null);
}
- private RegistryEntity createRegistryEntity(final Revision updatedRevision, final RegistryDTO registryDTO) {
+
+ @Override
+ public RegistryEntity createRegistry(Revision revision, RegistryDTO registryDTO) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // read lock on the containing group
+ // request claim for component to be created... revision already verified (version == 0)
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+ // update revision through revision manager
+ final RevisionUpdate revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
+ // add the component
+ final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);
+
+ // save the flow
+ controllerFacade.save();
+
+ final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
+ return new StandardRevisionUpdate<>(registry, lastMod);
+ });
+
+ final FlowRegistry registry = revisionUpdate.getComponent();
+ return createRegistryEntity(registry);
+ }
+
+ @Override
+ public RegistryEntity getRegistry(final String registryId) {
+ final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
+ return createRegistryEntity(registry);
+ }
+
+ private RegistryEntity createRegistryEntity(final FlowRegistry flowRegistry) {
+ if (flowRegistry == null) {
+ return null;
+ }
+
+ final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);
+ final Revision revision = revisionManager.getRevision(dto.getId());
+
final RegistryEntity entity = new RegistryEntity();
- entity.setId(registryDTO.getId());
- entity.setPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
- entity.setRevision(dtoFactory.createRevisionDTO(updatedRevision));
- entity.setComponent(registryDTO);
+ entity.setComponent(dto);
+ entity.setRevision(dtoFactory.createRevisionDTO(revision));
+ entity.setId(dto.getId());
+
+ // User who created it can read/write it.
+ final PermissionsDTO permissions = new PermissionsDTO();
+ permissions.setCanRead(true);
+ permissions.setCanWrite(true);
+ entity.setPermissions(permissions);
+
return entity;
}
- @Override
- public RegistryEntity createRegistry(Revision revision, RegistryDTO registryDTO) {
- registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO));
- return createRegistryEntity(revision, registryDTO);
- }
-
- @Override
- public RegistryEntity getRegistry(String registryId) {
- final Tuple registry = registryCache.get(registryId);
- return createRegistry(registry.getKey(), registry.getValue());
- }
-
@Override
public Set getRegistries() {
- return registryCache.values().stream()
- .map(registry -> createRegistry(registry.getKey(), registry.getValue()))
- .collect(Collectors.toSet());
+ return registryDAO.getFlowRegistries().stream()
+ .map(this::createRegistryEntity)
+ .collect(Collectors.toSet());
}
@Override
public RegistryEntity updateRegistry(Revision revision, RegistryDTO registryDTO) {
- registryCache.put(registryDTO.getId(), new Tuple(revision, registryDTO));
- return createRegistryEntity(revision, registryDTO);
+ final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
+ final RevisionUpdate revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
+ registry.setDescription(registryDTO.getDescription());
+ registry.setName(registryDTO.getName());
+ registry.setURL(registryDTO.getUri());
+
+ controllerFacade.save();
+
+ final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
+ final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
+
+ return new StandardRevisionUpdate(registry, lastModification);
+ });
+
+ final FlowRegistry updatedReg = revisionUpdate.getComponent();
+ return createRegistryEntity(updatedReg);
}
@Override
- public RegistryEntity deleteRegistry(Revision revision, String registryId) {
- final Tuple registry = registryCache.remove(registryId);
- return createRegistryEntity(registry.getKey(), registry.getValue());
+ public RegistryEntity deleteRegistry(final Revision revision, final String registryId) {
+ final RevisionClaim claim = new StandardRevisionClaim(revision);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
+ final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
+ controllerFacade.save();
+ return reg;
+ });
+
+ return createRegistryEntity(registry);
}
@Override
@@ -3665,6 +3728,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
})
.collect(Collectors.toCollection(HashSet::new));
+ final Map> connectionsByVersionedId = group.findAllConnections().stream()
+ .filter(conn -> conn.getVersionedComponentId().isPresent())
+ .collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get()));
+
for (final FlowDifference difference : comparison.getDifferences()) {
VersionedComponent component = difference.getComponentA();
if (component == null) {
@@ -3674,31 +3741,40 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
final VersionedConnection connection = (VersionedConnection) component;
- final ConnectableComponent source = connection.getSource();
- final ConnectableComponent destination = connection.getDestination();
+ final String versionedConnectionId = connection.getIdentifier();
+ final List instances = connectionsByVersionedId.get(versionedConnectionId);
+ if (instances == null) {
+ continue;
+ }
- affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user));
- affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user));
+ for (final Connection instance : instances) {
+ affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user));
+ affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user));
+ }
}
}
return affectedComponents;
}
- private String getComponentState(final InstantiatedConnectableComponent localComponent) {
- final String componentId = localComponent.getInstanceId();
- final String groupId = localComponent.getInstanceGroupId();
- switch (localComponent.getType()) {
- case PROCESSOR:
- return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name();
- case REMOTE_INPUT_PORT:
- return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name();
- case REMOTE_OUTPUT_PORT:
- return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name();
- default:
- return null;
- }
+ private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
+ entity.setId(connectable.getIdentifier());
+
+ final Authorizable authorizable = getAuthorizable(connectable);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(connectable.getIdentifier());
+ dto.setReferenceType(connectable.getConnectableType().name());
+ dto.setProcessGroupId(connectable.getProcessGroupIdentifier());
+ dto.setState(connectable.getScheduledState().name());
+
+ entity.setComponent(dto);
+ return entity;
}
private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
@@ -3720,24 +3796,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entity;
}
- private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) {
- final AffectedComponentEntity entity = new AffectedComponentEntity();
- entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
- entity.setId(instance.getInstanceId());
- final String componentTypeName = instance.getType().name();
- final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
- final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
- entity.setPermissions(permissionsDto);
-
- final AffectedComponentDTO dto = new AffectedComponentDTO();
- dto.setId(instance.getInstanceId());
- dto.setReferenceType(componentTypeName);
- dto.setProcessGroupId(instance.getInstanceGroupId());
- dto.setState(getComponentState(instance));
-
- entity.setComponent(dto);
- return entity;
+ private Authorizable getAuthorizable(final Connectable connectable) {
+ switch (connectable.getConnectableType()) {
+ case REMOTE_INPUT_PORT:
+ case REMOTE_OUTPUT_PORT:
+ final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
+ return authorizableLookup.getRemoteProcessGroup(rpgId);
+ default:
+ return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
+ }
}
private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
@@ -3820,7 +3888,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
- final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates();
+ final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryUrl = remoteCoordinates.getRegistryUrl();
@@ -3868,17 +3936,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
- return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified);
+ return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true);
}
@Override
- public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
- final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+ public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+ final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
final RevisionUpdate snapshot = updateComponent(user, revision,
processGroupNode,
- () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified),
+ () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings),
processGroup -> dtoFactory.createProcessGroupDto(processGroup));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
@@ -4243,27 +4311,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
this.leaderElectionManager = leaderElectionManager;
}
+ public void setRegistryDAO(RegistryDAO registryDao) {
+ this.registryDAO = registryDao;
+ }
+
public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
this.flowRegistryClient = flowRegistryClient;
-
- // temp code to load the registry client cache
- final Set registryIdentifiers = flowRegistryClient.getRegistryIdentifiers();
- if (registryIdentifiers != null) {
-
- for (final String registryIdentifier : registryIdentifiers) {
- final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryIdentifier);
-
- final RegistryDTO registry = new RegistryDTO();
- registry.setId(registryIdentifier);
- registry.setName(flowRegistry.getName());
- registry.setUri(flowRegistry.getURL());
- registry.setDescription("Default client for storing Flow Revisions to the local disk.");
-
- final RegistryEntity registryEntity = new RegistryEntity();
- registryEntity.setComponent(registry);
-
- registryCache.put(registryIdentifier, new Tuple(new Revision(0L, null, registryIdentifier), registry));
- }
- }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index ebed0ad7c9..11c548ff17 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,12 +16,57 @@
*/
package org.apache.nifi.web.api;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLStreamReader;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@@ -42,6 +87,8 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.registry.flow.FlowRegistryUtils;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -64,6 +111,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
@@ -104,55 +152,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.stream.XMLStreamReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
@@ -826,7 +831,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
final Map headers = new HashMap<>();
- final MultivaluedMap requestEntity = new MultivaluedHashMap();
+ final MultivaluedMap requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
@@ -983,7 +988,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
final Map headers = new HashMap<>();
- final MultivaluedMap requestEntity = new MultivaluedHashMap();
+ final MultivaluedMap requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
@@ -1513,9 +1518,10 @@ public class ProcessGroupResource extends ApplicationResource {
* Adds the specified process group.
*
* @param httpServletRequest request
- * @param groupId The group id
+ * @param groupId The group id
* @param requestProcessGroupEntity A processGroupEntity
* @return A processGroupEntity
+ * @throws IOException if the request indicates that the Process Group should be imported from a Flow Registry and NiFi is unable to communicate with the Flow Registry
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@@ -1547,7 +1553,7 @@ public class ProcessGroupResource extends ApplicationResource {
@ApiParam(
value = "The process group configuration details.",
required = true
- ) final ProcessGroupEntity requestProcessGroupEntity) {
+ ) final ProcessGroupEntity requestProcessGroupEntity) throws IOException {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
@@ -1574,6 +1580,28 @@ public class ProcessGroupResource extends ApplicationResource {
}
requestProcessGroupEntity.getComponent().setParentGroupId(groupId);
+ // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
+ // Step 2: Retrieve flow from Flow Registry
+ // Step 3: Resolve Bundle info
+ // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
+ // Step 5: If any of the components is a Restricted Component, then we must authorize the user
+ // for write access to the RestrictedComponents resource
+ // Step 6: Replicate the request or call serviceFacade.updateProcessGroup
+
+ final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
+ if (versionControlInfo != null) {
+ // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
+ // Step 2: Retrieve flow from Flow Registry
+ final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
+
+ // Step 3: Resolve Bundle info
+ BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
+
+ // Step 4: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
+ requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
+ }
+
+ // Step 6: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);
}
@@ -1584,8 +1612,23 @@ public class ProcessGroupResource extends ApplicationResource {
lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+
+ // Step 5: If any of the components is a Restricted Component, then we must authorize the user
+ // for write access to the RestrictedComponents resource
+ final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
+ if (versionedFlowSnapshot != null) {
+ final boolean containsRestrictedComponent = FlowRegistryUtils.containsRestrictedComponent(versionedFlowSnapshot.getFlowContents());
+ if (containsRestrictedComponent) {
+ lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ }
+ }
+ },
+ () -> {
+ final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
+ if (versionedFlowSnapshot != null) {
+ serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
+ }
},
- null,
processGroupGroupEntity -> {
// set the processor id as appropriate
processGroupGroupEntity.getComponent().setId(generateUuid());
@@ -1593,6 +1636,16 @@ public class ProcessGroupResource extends ApplicationResource {
// create the process group contents
final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId());
final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent());
+
+ final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
+ if (flowSnapshot != null) {
+ final RevisionDTO revisionDto = entity.getRevision();
+ final String newGroupId = entity.getComponent().getId();
+ final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
+ serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
+ versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
+ }
+
populateRemainingProcessGroupEntityContent(entity);
// generate a 201 created response
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 9fbd5e8373..27216a4b8f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,12 +17,39 @@
package org.apache.nifi.web.api;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.Authorizer;
@@ -30,12 +57,11 @@ import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
+import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
@@ -48,7 +74,6 @@ import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager;
import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
@@ -71,36 +96,12 @@ import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
@Path("/versions")
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
@@ -163,7 +164,7 @@ public class VersionsResource extends ApplicationResource {
@POST
- @Consumes(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("start-requests")
@ApiOperation(
@@ -402,10 +403,10 @@ public class VersionsResource extends ApplicationResource {
final NodeResponse clusterResponse;
try {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse();
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, null, Collections.emptyMap()).awaitMergedResponse();
+ getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -466,10 +467,10 @@ public class VersionsResource extends ApplicationResource {
final NodeResponse clusterResponse;
try {
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse();
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, null, Collections.emptyMap()).awaitMergedResponse();
+ getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -942,7 +943,7 @@ public class VersionsResource extends ApplicationResource {
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
- discoverCompatibleBundles(flowSnapshot.getFlowContents());
+ BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// Step 1: Determine which components will be affected by updating the version
final Set affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
@@ -956,6 +957,12 @@ public class VersionsResource extends ApplicationResource {
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component affected.
authorizeAffectedComponents(lookup, affectedComponents);
+
+ final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
+ final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
+ if (containsRestrictedComponents) {
+ lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ }
},
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
@@ -1070,7 +1077,7 @@ public class VersionsResource extends ApplicationResource {
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
- discoverCompatibleBundles(flowSnapshot.getFlowContents());
+ BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// Step 1: Determine which components will be affected by updating the version
final Set affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
@@ -1084,6 +1091,12 @@ public class VersionsResource extends ApplicationResource {
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component affected.
authorizeAffectedComponents(lookup, affectedComponents);
+
+ final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
+ final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
+ if (containsRestrictedComponents) {
+ lookup.getRestrictedComponents().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ }
},
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
@@ -1258,7 +1271,7 @@ public class VersionsResource extends ApplicationResource {
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation();
- serviceFacade.updateProcessGroup(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified);
+ serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
}
asyncRequest.setLastUpdated(new Date());
@@ -1384,50 +1397,6 @@ public class VersionsResource extends ApplicationResource {
this.dtoFactory = dtoFactory;
}
- private BundleDTO createBundleDto(final Bundle bundle) {
- final BundleDTO dto = new BundleDTO();
- dto.setArtifact(bundle.getArtifact());
- dto.setGroup(dto.getGroup());
- dto.setVersion(dto.getVersion());
- return dto;
- }
-
- /**
- * Discovers the compatible bundle details for the components in the specified snippet.
- *
- * @param versionedGroup the versioned group
- */
- private void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) {
- if (versionedGroup.getProcessors() != null) {
- versionedGroup.getProcessors().forEach(processor -> {
- final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle()));
-
- final Bundle bundle = new Bundle();
- bundle.setArtifact(coordinate.getId());
- bundle.setGroup(coordinate.getGroup());
- bundle.setVersion(coordinate.getVersion());
- processor.setBundle(bundle);
- });
- }
-
- if (versionedGroup.getControllerServices() != null) {
- versionedGroup.getControllerServices().forEach(controllerService -> {
- final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle()));
-
- final Bundle bundle = new Bundle();
- bundle.setArtifact(coordinate.getId());
- bundle.setGroup(coordinate.getGroup());
- bundle.setVersion(coordinate.getVersion());
- controllerService.setBundle(bundle);
- });
- }
-
- if (versionedGroup.getProcessGroups() != null) {
- versionedGroup.getProcessGroups().forEach(processGroup -> {
- discoverCompatibleBundles(processGroup);
- });
- }
- }
private static class ActiveRequest {
private static final long MAX_REQUEST_LOCK_NANOS = TimeUnit.MINUTES.toNanos(1L);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ae3fc56341..8e0f0c7d81 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -113,6 +113,7 @@ import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
@@ -3713,6 +3714,15 @@ public final class DtoFactory {
return nodeDto;
}
+ public RegistryDTO createRegistryDto(FlowRegistry registry) {
+ final RegistryDTO dto = new RegistryDTO();
+ dto.setDescription(registry.getDescription());
+ dto.setId(registry.getIdentifier());
+ dto.setName(registry.getName());
+ dto.setUri(registry.getURL());
+ return dto;
+ }
+
/* setters */
public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 615f00b65d..29e5f7d7cb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -82,6 +82,7 @@ import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.ReportingTask;
@@ -1643,6 +1644,9 @@ public class ControllerFacade implements Authorizable {
return dto;
}
+ public void verifyComponentTypes(VersionedProcessGroup versionedFlow) {
+ flowController.verifyComponentTypesInSnippet(versionedFlow);
+ }
private ComponentSearchResultDTO search(final String searchStr, final ProcessorNode procNode) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 806979f4f8..650d4b3292 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -113,10 +113,11 @@ public interface ProcessGroupDAO {
* @param proposedSnapshot Flow the new version of the flow
* @param versionControlInformation the new Version Control Information
* @param componentIdSeed the seed value to use for generating ID's for new components
+ * @param updateSettings whether or not to update the process group's name and position
* @return the process group
*/
ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
- boolean verifyNotModified);
+ boolean verifyNotModified, boolean updateSettings);
/**
* Applies the given Version Control Information to the Process Group
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java
new file mode 100644
index 0000000000..83b5c6df9b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RegistryDAO.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.dao;
+
+import java.util.Set;
+
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.web.api.dto.RegistryDTO;
+
+public interface RegistryDAO {
+
+ FlowRegistry createFlowRegistry(RegistryDTO registryDto);
+
+ FlowRegistry getFlowRegistry(String registryId);
+
+ Set getFlowRegistries();
+
+ FlowRegistry removeFlowRegistry(String registryId);
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
new file mode 100644
index 0000000000..eb2ac76454
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.dao.impl;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.RegistryDTO;
+import org.apache.nifi.web.dao.RegistryDAO;
+
+public class FlowRegistryDAO implements RegistryDAO {
+ private FlowRegistryClient flowRegistryClient;
+
+ @Override
+ public FlowRegistry createFlowRegistry(final RegistryDTO registryDto) {
+ return flowRegistryClient.addFlowRegistry(registryDto.getId(), registryDto.getName(), registryDto.getUri(), registryDto.getDescription());
+ }
+
+ @Override
+ public FlowRegistry getFlowRegistry(final String registryId) {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("Unable to find Flow Registry with id '" + registryId + "'");
+ }
+
+ return registry;
+ }
+
+ @Override
+ public Set getFlowRegistries() {
+ return flowRegistryClient.getRegistryIdentifiers().stream()
+ .map(flowRegistryClient::getFlowRegistry)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public FlowRegistry removeFlowRegistry(final String registryId) {
+ final FlowRegistry registry = flowRegistryClient.removeFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("Unable to find Flow Registry with id '" + registryId + "'");
+ }
+ return registry;
+ }
+
+ public void setFlowRegistryClient(FlowRegistryClient client) {
+ this.flowRegistryClient = client;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index f842a8c8ad..7828337690 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -246,6 +246,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
return group;
}
+ @Override
public ProcessGroup disconnectVersionControl(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.disconnectVersionControl();
@@ -254,9 +255,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
- final String componentIdSeed, final boolean verifyNotModified) {
+ final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
- group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified);
+ group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
final StandardVersionControlInformation svci = new StandardVersionControlInformation(
versionControlInformation.getRegistryId(),
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 99a4f1c7d7..e71de670d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -125,6 +125,9 @@
+
+
+
@@ -182,6 +185,7 @@
+