From 181d6809c126da862417e79fb5d794ed5f8eefac Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 11 Dec 2017 15:36:56 -0500 Subject: [PATCH] NIFI-4436: Bug fixes; ensure correct Exception types are thrown Signed-off-by: Matt Gilman --- .../manager/ProcessGroupEntityMerger.java | 3 + .../org/apache/nifi/groups/ProcessGroup.java | 2 +- .../nifi/registry/flow/FlowRegistry.java | 1 - .../nifi/controller/FlowController.java | 14 ++- .../nifi/groups/StandardProcessGroup.java | 50 +++++++--- .../flow/StandardFlowRegistryClient.java | 11 ++- .../org/apache/nifi/util/SnippetUtils.java | 99 +++++++++++++++++++ .../StandardFlowSynchronizerSpec.groovy | 2 + .../service/mock/MockProcessGroup.java | 2 +- .../apache/nifi/web/NiFiServiceFacade.java | 22 ++--- .../nifi/web/StandardNiFiServiceFacade.java | 72 ++++++++++---- .../nifi/web/api/ProcessGroupResource.java | 9 +- .../apache/nifi/web/api/VersionsResource.java | 4 +- .../impl/StandardControllerServiceDAO.java | 30 +++--- .../web/dao/impl/StandardProcessGroupDAO.java | 2 +- .../apache/nifi/web/util/SnippetUtils.java | 1 - 16 files changed, 246 insertions(+), 78 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java index d2eb74997e..d74fdeb1f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java @@ -48,6 +48,9 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger * * @param group group - * @param templateContents contents + * @param snippetContents contents */ - private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) { + private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO snippetContents) { // validate the names of Input Ports - for (final PortDTO port : templateContents.getInputPorts()) { + for (final PortDTO port : snippetContents.getInputPorts()) { if (group.getInputPortByName(port.getName()) != null) { throw new IllegalStateException("One or more of the proposed Port names is not available in the process group"); } } // validate the names of Output Ports - for (final PortDTO port : templateContents.getOutputPorts()) { + for (final PortDTO port : snippetContents.getOutputPorts()) { if (group.getOutputPortByName(port.getName()) != null) { throw new IllegalStateException("One or more of the proposed Port names is not available in the process group"); } } - verifyComponentTypesInSnippet(templateContents); + verifyComponentTypesInSnippet(snippetContents); + + SnippetUtils.verifyNoVersionControlConflicts(snippetContents, group); } + /** * Recursively finds all ConnectionDTO's * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 7d184dfe7b..5d5d0f4e18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -112,6 +112,7 @@ import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.SnippetUtils; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; @@ -2294,6 +2295,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { verifyContents(snippet); verifyDestinationNotInSnippet(snippet, destination); + SnippetUtils.verifyNoVersionControlConflicts(snippet, this, destination); if (!isDisconnected(snippet)) { throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); @@ -3087,13 +3089,15 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void disconnectVersionControl() { + public void disconnectVersionControl(final boolean removeVersionedComponentIds) { writeLock.lock(); try { this.versionControlInfo.set(null); - // remove version component ids from each component (until another versioned PG is encountered) - applyVersionedComponentIds(this, id -> null); + if (removeVersionedComponentIds) { + // remove version component ids from each component (until another versioned PG is encountered) + applyVersionedComponentIds(this, id -> null); + } } finally { writeLock.unlock(); } @@ -3278,7 +3282,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Set knownVariables = getKnownVariableNames(); updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); } catch (final ProcessorInstantiationException pie) { - throw new RuntimeException(pie); + throw new IllegalStateException("Failed to update flow", pie); } finally { writeLock.unlock(); } @@ -3366,7 +3370,9 @@ public final class StandardProcessGroup implements ProcessGroup { group.setVariables(updatedVariableMap); final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates(); - if (remoteCoordinates != null) { + if (remoteCoordinates == null) { + group.disconnectVersionControl(false); + } else { final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl()); final String bucketId = remoteCoordinates.getBucketId(); final String flowId = remoteCoordinates.getFlowId(); @@ -3681,8 +3687,6 @@ public final class StandardProcessGroup implements ProcessGroup { } private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) { - // TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because - /// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup. long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits(); UUID uuid; @@ -3733,7 +3737,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { return flowController.createPrioritizer(prioritizerName); } catch (final Exception e) { - throw new RuntimeException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier()); + throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier()); } }) .collect(Collectors.toList()); @@ -4016,7 +4020,14 @@ public final class StandardProcessGroup implements ProcessGroup { // The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this // to the instance ID of the Controller Service. final String serviceVersionedComponentId = entry.getValue(); - final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); + String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); + if (instanceId == null) { + // We didn't find the instance ID based on the Versioned Component ID. So we want to just + // leave the value set to whatever it currently is, if it's currently set. + final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build(); + instanceId = currentProperties.get(propertyDescriptor); + } + value = instanceId == null ? serviceVersionedComponentId : instanceId; } else { value = entry.getValue(); @@ -4169,15 +4180,22 @@ public final class StandardProcessGroup implements ProcessGroup { + " reverted to its original form before changing the version."); } } + + verifyNoDescendantsWithLocalModifications("be updated"); } final VersionedProcessGroup flowContents = updatedFlow.getFlowContents(); if (verifyConnectionRemoval) { // Determine which Connections have been removed. final Map removedConnectionByVersionedId = new HashMap<>(); + + // Populate the 'removedConnectionByVersionId' map with all Connections. We key off of the connection's VersionedComponentID + // if it is populated. Otherwise, we key off of its actual ID. We do this because it allows us to then remove from this Map + // any connection that does exist in the proposed flow. This results in us having a Map whose values are those Connections + // that were removed. We can then check for any connections that have data in them. If any Connection is to be removed but + // has data, then we should throw an IllegalStateException. findAllConnections().stream() - .filter(conn -> conn.getVersionedComponentId().isPresent()) - .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn)); + .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn)); final Set proposedFlowConnectionIds = new HashSet<>(); findAllConnectionIds(flowContents, proposedFlowConnectionIds); @@ -4252,8 +4270,8 @@ public final class StandardProcessGroup implements ProcessGroup { if (!proposedProcessGroups.containsKey(versionedId)) { // Process Group was removed. throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup - + " that exists locally has one or more Templates, and the proposed flow does not contain this Process Group. " - + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to chnage the version of the flow."); + + " that exists locally has one or more Templates, and the proposed flow does not contain these templates. " + + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow."); } } @@ -4430,6 +4448,12 @@ public final class StandardProcessGroup implements ProcessGroup { + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before " + "this action can be performed on the parent Process Group."); } + + if (flowState == VersionedFlowState.SYNC_FAILURE) { + throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and " + + "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be " + + "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background."); + } } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java index 8a2447ddea..4f98a2beea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClient.java @@ -43,6 +43,13 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { @Override public void addFlowRegistry(final FlowRegistry registry) { + final boolean duplicateName = registryById.values().stream() + .anyMatch(reg -> reg.getName().equals(registry.getName())); + + if (duplicateName) { + throw new IllegalStateException("Cannot add Flow Registry because a Flow Registry already exists with the name " + registry.getName()); + } + final FlowRegistry existing = registryById.putIfAbsent(registry.getIdentifier(), registry); if (existing != null) { throw new IllegalStateException("Cannot add Flow Registry " + registry + " because a Flow Registry already exists with the ID " + registry.getIdentifier()); @@ -58,7 +65,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) { final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false); if (sslContext == null && uriScheme.equalsIgnoreCase("https")) { - throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl + throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. " + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https."); } @@ -68,7 +75,7 @@ public class StandardFlowRegistryClient implements FlowRegistryClient { } else if (uriScheme.equalsIgnoreCase("http") || uriScheme.equalsIgnoreCase("https")) { final SSLContext sslContext = SslContextFactory.createSslContext(nifiProperties, false); if (sslContext == null && uriScheme.equalsIgnoreCase("https")) { - throw new RuntimeException("Failed to create Flow Registry for URI " + registryUrl + throw new IllegalStateException("Failed to create Flow Registry for URI " + registryUrl + " because this NiFi is not configured with a Keystore/Truststore, so it is not capable of communicating with a secure Registry. " + "Please populate NiFi's Keystore/Truststore properties or connect to a NiFi Registry over http instead of https."); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java index b482169441..9c04559882 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/SnippetUtils.java @@ -16,11 +16,15 @@ */ package org.apache.nifi.util; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.web.api.dto.ComponentDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import java.util.ArrayList; import java.util.Collection; @@ -307,4 +311,99 @@ public final class SnippetUtils { connection.setBends(bends); } } + + public static void verifyNoVersionControlConflicts(final Snippet snippet, final ProcessGroup parentGroup, final ProcessGroup destination) { + if (snippet == null) { + return; + } + if (snippet.getProcessGroups() == null) { + return; + } + + final List vcis = new ArrayList<>(); + for (final String groupId : snippet.getProcessGroups().keySet()) { + final ProcessGroup group = parentGroup.getProcessGroup(groupId); + if (group != null) { + findAllVersionControlInfo(group, vcis); + } + } + + verifyNoDuplicateVersionControlInfo(destination, vcis); + } + + public static void verifyNoVersionControlConflicts(final FlowSnippetDTO snippetContents, final ProcessGroup destination) { + final List vcis = new ArrayList<>(); + for (final ProcessGroupDTO childGroup : snippetContents.getProcessGroups()) { + findAllVersionControlInfo(childGroup, vcis); + } + + verifyNoDuplicateVersionControlInfoDtos(destination, vcis); + } + + private static void verifyNoDuplicateVersionControlInfoDtos(final ProcessGroup group, final Collection snippetVcis) { + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + for (final VersionControlInformationDTO snippetVci : snippetVcis) { + if (vci.getBucketIdentifier().equals(snippetVci.getBucketId()) && vci.getFlowIdentifier().equals(snippetVci.getFlowId())) { + throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is " + + "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control " + + "cannot contain a child Process Group that points to the same Versioned Flow."); + } + } + } + + final ProcessGroup parent = group.getParent(); + if (parent != null) { + verifyNoDuplicateVersionControlInfoDtos(parent, snippetVcis); + } + } + + private static void verifyNoDuplicateVersionControlInfo(final ProcessGroup group, final Collection snippetVcis) { + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + for (final VersionControlInformation snippetVci : snippetVcis) { + if (vci.getBucketIdentifier().equals(snippetVci.getBucketIdentifier()) && vci.getFlowIdentifier().equals(snippetVci.getFlowIdentifier())) { + throw new IllegalArgumentException("Cannot place the given Process Group into the desired destination because the destination group or one of its ancestor groups is " + + "under Version Control and one of the selected Process Groups is also under Version Control with the same Flow. A Process Group that is under Version Control " + + "cannot contain a child Process Group that points to the same Versioned Flow."); + } + } + } + + final ProcessGroup parent = group.getParent(); + if (parent != null) { + verifyNoDuplicateVersionControlInfo(parent, snippetVcis); + } + } + + + private static void findAllVersionControlInfo(final ProcessGroupDTO dto, final List found) { + final VersionControlInformationDTO vci = dto.getVersionControlInformation(); + if (vci != null) { + found.add(vci); + } + + final FlowSnippetDTO contents = dto.getContents(); + if (contents != null) { + for (final ProcessGroupDTO child : contents.getProcessGroups()) { + findAllVersionControlInfo(child, found); + } + } + } + + private static void findAllVersionControlInfo(final ProcessGroup group, final List found) { + if (group == null) { + return; + } + + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + found.add(vci); + } + + for (final ProcessGroup childGroup : group.findAllProcessGroups()) { + findAllVersionControlInfo(childGroup, found); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy index 7483228e2b..897d77e8a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy @@ -127,6 +127,8 @@ class StandardFlowSynchronizerSpec extends Specification { } } } + _ * processGroup.findAllRemoteProcessGroups() >> [] + positionableMocksById.put(pgId, processGroup) return processGroup } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index d006cffa8d..95e2d6a4b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -667,7 +667,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void disconnectVersionControl() { + public void disconnectVersionControl(final boolean removeVersionedComponentIds) { this.versionControlInfo = null; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 514cd1848a..6c20eac022 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -23,7 +23,6 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedProcessGroup; @@ -117,7 +116,6 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; -import java.io.IOException; import java.util.Date; import java.util.List; import java.util.Map; @@ -423,11 +421,12 @@ public interface NiFiServiceFacade { * with the given id * * @param versionControlInfo the information about the versioned flow + * @param versionedProcessGroup the contents to be imported * @param groupId the ID of the Process Group where the flow should be instantiated * * @throws IllegalStateException if the flow cannot be imported into the specified group */ - void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId); + void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup versionedProcessGroup, String groupId); /** * Creates a new Template based off the specified snippet. @@ -1295,7 +1294,7 @@ public interface NiFiServiceFacade { * was last synchronized with the Flow Registry * @throws IllegalStateException if the Process Group with the given ID is not under version control */ - FlowComparisonEntity getLocalModifications(String processGroupId) throws IOException, NiFiRegistryException; + FlowComparisonEntity getLocalModifications(String processGroupId); /** * Returns the Version Control information for the Process Group with the given ID @@ -1314,9 +1313,9 @@ public interface NiFiServiceFacade { * @param flow the flow to add to the registry * @return a VersionedFlow that is fully populated, including identifiers * - * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiCoreException if unable to register flow */ - VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, NiFiRegistryException; + VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow); /** * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry @@ -1337,7 +1336,7 @@ public interface NiFiServiceFacade { * @param flowId the ID of the flow * @return the VersionedFlow that was deleted */ - VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId) throws IOException, NiFiRegistryException; + VersionedFlow deleteVersionedFlow(String registryId, String bucketId, String flowId); /** * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id @@ -1349,10 +1348,9 @@ public interface NiFiServiceFacade { * @param expectedVersion the version to save the flow as * @return the snapshot that represents what was stored in the registry * - * @throws IOException if unable to communicate with the Flow Registry + * @throws NiFiCoreException if unable to register the snapshot with the flow registry */ - VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion) - throws IOException, NiFiRegistryException; + VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments, int expectedVersion); /** * Updates the Version Control Information on the Process Group with the given ID @@ -1386,7 +1384,7 @@ public interface NiFiServiceFacade { * * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found */ - VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException; + VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows); /** * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return @@ -1406,7 +1404,7 @@ public interface NiFiServiceFacade { * @param user the user making the request * @return the set of all components that would be affected by updating the Process Group */ - Set getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException; + Set getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user); /** * Verifies that the Process Group with the given identifier can be updated to the proposed flow diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index a2d6e41746..11bc39d335 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -95,6 +95,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedFlowState; @@ -1866,20 +1867,21 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) { + public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - verifyImportProcessGroup(versionControlInfo, group); + verifyImportProcessGroup(versionControlInfo, contents, group); } - private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) { + private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) { if (group == null) { return; } final VersionControlInformation vci = group.getVersionControlInformation(); if (vci != null) { - if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier()) - && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) + // Note that we do not compare the Registry ID here because there could be two registry clients + // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance).. + if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " @@ -1887,7 +1889,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } } - verifyImportProcessGroup(vciDto, group.getParent()); + final Set childGroups = contents.getProcessGroups(); + if (childGroups != null) { + for (final VersionedProcessGroup childGroup : childGroups) { + final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates(); + if (childCoordinates != null) { + final VersionControlInformationDTO childVci = new VersionControlInformationDTO(); + childVci.setBucketId(childCoordinates.getBucketId()); + childVci.setFlowId(childCoordinates.getFlowId()); + verifyImportProcessGroup(childVci, childGroup, group); + } + } + } + + verifyImportProcessGroup(vciDto, contents, group.getParent()); } @Override @@ -3447,8 +3462,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies())); entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem())); entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents())); - - // TODO - update to be user specific entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers())); return entity; @@ -3722,13 +3735,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { + public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId); } - return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); + try { + return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser()); + } catch (final IOException | NiFiRegistryException e) { + throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e); + } } @Override @@ -3752,7 +3769,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public FlowComparisonEntity getLocalModifications(final String processGroupId) throws IOException, NiFiRegistryException { + public FlowComparisonEntity getLocalModifications(final String processGroupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); if (versionControlInfo == null) { @@ -3765,11 +3782,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { + " but cannot find a Flow Registry with that identifier"); } - final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), - versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser()); + final VersionedFlowSnapshot versionedFlowSnapshot; + try { + versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser()); + } catch (final IOException | NiFiRegistryException e) { + throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e); + } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); @@ -3802,13 +3824,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException { + public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser()); + try { + return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser()); + } catch (final IOException | NiFiRegistryException e) { + throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); + } } private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException { @@ -3822,13 +3848,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, - final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) throws IOException, NiFiRegistryException { + final VersionedProcessGroup snapshot, final String comments, final int expectedVersion) { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); if (registry == null) { throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId); } - return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser()); + try { + return registry.registerVersionedFlowSnapshot(flow, snapshot, comments, expectedVersion, NiFiUserUtils.getNiFiUser()); + } catch (final IOException | NiFiRegistryException e) { + throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); + } } @Override @@ -3881,7 +3911,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public Set getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException { + public Set getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); @@ -4057,7 +4087,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException { + public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) { final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId()); if (flowRegistry == null) { throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId()); @@ -4066,7 +4096,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot snapshot; try { snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser()); - } catch (final NiFiRegistryException e) { + } catch (final NiFiRegistryException | IOException e) { throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index b3ccefb450..a2c16ed70a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1644,10 +1644,6 @@ public class ProcessGroupResource extends ApplicationResource { // Step 6: Replicate the request or call serviceFacade.updateProcessGroup final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); - if (versionControlInfo != null) { - serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId); - } - if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry @@ -1670,6 +1666,11 @@ public class ProcessGroupResource extends ApplicationResource { requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot); } + if (versionControlInfo != null) { + final VersionedFlowSnapshot flowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); + serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId); + } + // Step 6: Replicate the request or call serviceFacade.updateProcessGroup if (isReplicateRequest()) { return replicate(HttpMethod.POST, requestProcessGroupEntity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 3090c6e348..1d4cd88fff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -1110,7 +1110,7 @@ public class VersionsResource extends ApplicationResource { // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors"); + final AsynchronousWebRequest request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors"); // Submit the request to be performed in the background final Consumer> updateTask = vcur -> { @@ -1275,7 +1275,7 @@ public class VersionsResource extends ApplicationResource { // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors"); + final AsynchronousWebRequest request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Affected Processors"); // Submit the request to be performed in the background final Consumer> updateTask = vcur -> { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 4d8e984c7a..5622097eed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -172,21 +172,23 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } } - controllerService.getProcessGroup().onComponentModified(); - - // For any component that references this Controller Service, find the component's Process Group - // and notify the Process Group that a component has been modified. This way, we know to re-calculate - // whether or not the Process Group has local modifications. final ProcessGroup group = controllerService.getProcessGroup(); - controllerService.getReferences().getReferencingComponents().stream() - .map(ConfiguredComponent::getProcessGroupIdentifier) - .filter(id -> !id.equals(group.getIdentifier())) - .forEach(groupId -> { - final ProcessGroup descendant = group.findProcessGroup(groupId); - if (descendant != null) { - descendant.onComponentModified(); - } - }); + if (group != null) { + group.onComponentModified(); + + // For any component that references this Controller Service, find the component's Process Group + // and notify the Process Group that a component has been modified. This way, we know to re-calculate + // whether or not the Process Group has local modifications. + controllerService.getReferences().getReferencingComponents().stream() + .map(ConfiguredComponent::getProcessGroupIdentifier) + .filter(id -> !id.equals(group.getIdentifier())) + .forEach(groupId -> { + final ProcessGroup descendant = group.findProcessGroup(groupId); + if (descendant != null) { + descendant.onComponentModified(); + } + }); + } return controllerService; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 52a18dc1dc..e7e85af04c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -274,7 +274,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou @Override public ProcessGroup disconnectVersionControl(final String groupId) { final ProcessGroup group = locateProcessGroup(flowController, groupId); - group.disconnectVersionControl(); + group.disconnectVersionControl(true); return group; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index 11bd1b800c..2dbd1ca18e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -90,7 +90,6 @@ public final class SnippetUtils { private DtoFactory dtoFactory; private AccessPolicyDAO accessPolicyDAO; - /** * Populates the specified snippet and returns the details. *