diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index cf9d6c5052..519089f048 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -614,16 +614,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private void removeMissingConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map connectionsByVersionedId) { final Set connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet()); + final Set connectionsRemovedDueToChangingSourceId = new HashSet<>(); for (final VersionedConnection proposedConnection : proposed.getConnections()) { connectionsRemoved.remove(proposedConnection.getIdentifier()); } + // Check for any case where there's an existing connection whose ID matches the proposed connection, but whose source doesn't match + // the proposed source ID. The source of a Connection should never change from one component to another. However, there are cases + // in which the Versioned Component ID might change, in order to avoid conflicts with sibling Process Groups. In such a case, we must remove + // the connection and create a new one, since we cannot simply change the source in the same way that we can change the destination. + for (final VersionedConnection proposedConnection : proposed.getConnections()) { + final Connection existingConnection = connectionsByVersionedId.get(proposedConnection.getIdentifier()); + + if (existingConnection != null) { + final String proposedSourceId = proposedConnection.getSource().getId(); + final String existingSourceId = existingConnection.getSource().getVersionedComponentId().orElse(null); + + if (!Objects.equals(proposedSourceId, existingSourceId)) { + connectionsRemovedDueToChangingSourceId.add(proposedConnection.getIdentifier()); + connectionsRemoved.add(proposedConnection.getIdentifier()); + } + } + } + for (final String removedVersionedId : connectionsRemoved) { final Connection connection = connectionsByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", connection, group); group.removeConnection(connection); } + + for (final String removedVersionedId : connectionsRemovedDueToChangingSourceId) { + connectionsByVersionedId.remove(removedVersionedId); + } } private void synchronizeConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map connectionsByVersionedId) {