NIFI-11159 Fixing connections with source having reassigned id

This commit is contained in:
Bence Simon 2023-02-09 19:32:14 +01:00 committed by Mark Payne
parent a66727c4d1
commit b0ec28a452
1 changed files with 23 additions and 0 deletions

View File

@ -614,16 +614,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private void removeMissingConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) { private void removeMissingConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) {
final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet()); final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
final Set<String> connectionsRemovedDueToChangingSourceId = new HashSet<>();
for (final VersionedConnection proposedConnection : proposed.getConnections()) { for (final VersionedConnection proposedConnection : proposed.getConnections()) {
connectionsRemoved.remove(proposedConnection.getIdentifier()); connectionsRemoved.remove(proposedConnection.getIdentifier());
} }
// Check for any case where there's an existing connection whose ID matches the proposed connection, but whose source doesn't match
// the proposed source ID. The source of a Connection should never change from one component to another. However, there are cases
// in which the Versioned Component ID might change, in order to avoid conflicts with sibling Process Groups. In such a case, we must remove
// the connection and create a new one, since we cannot simply change the source in the same way that we can change the destination.
for (final VersionedConnection proposedConnection : proposed.getConnections()) {
final Connection existingConnection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
if (existingConnection != null) {
final String proposedSourceId = proposedConnection.getSource().getId();
final String existingSourceId = existingConnection.getSource().getVersionedComponentId().orElse(null);
if (!Objects.equals(proposedSourceId, existingSourceId)) {
connectionsRemovedDueToChangingSourceId.add(proposedConnection.getIdentifier());
connectionsRemoved.add(proposedConnection.getIdentifier());
}
}
}
for (final String removedVersionedId : connectionsRemoved) { for (final String removedVersionedId : connectionsRemoved) {
final Connection connection = connectionsByVersionedId.get(removedVersionedId); final Connection connection = connectionsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", connection, group); LOG.info("Removing {} from {}", connection, group);
group.removeConnection(connection); group.removeConnection(connection);
} }
for (final String removedVersionedId : connectionsRemovedDueToChangingSourceId) {
connectionsByVersionedId.remove(removedVersionedId);
}
} }
private void synchronizeConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) { private void synchronizeConnections(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, Connection> connectionsByVersionedId) {