mirror of https://github.com/apache/nifi.git
NIFI-7241: When updating Process Group to match VersionedProcessGroup, remove any connections before recursing into child groups. This ensures that if a Port exists in child group A and is connected to a port in child group B, if the VersionedProcessGroup indicates to remove the port, that connection will be removed before attempting to remove the port. Updating and adding connections must still be done last, after all components have been added. But missing connections can be removed earlier.
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4136.
This commit is contained in:
parent
290bd378d5
commit
d4a2afc25c
|
@ -3805,7 +3805,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
|
||||
if (service == null) {
|
||||
service = addControllerService(group, proposedService, componentIdSeed);
|
||||
LOG.info("Added {} to {}", service, this);
|
||||
LOG.info("Added {} to {}", service, group);
|
||||
servicesAdded.put(proposedService.getIdentifier(), service);
|
||||
}
|
||||
|
||||
|
@ -3837,6 +3837,28 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
controllerServicesRemoved.remove(proposedService.getIdentifier());
|
||||
}
|
||||
|
||||
// Before we can update child groups, we must first remove any connections that are connected to those child groups' input/output ports.
|
||||
// We cannot add or update connections yet, though. That must be done at the end, as it's possible that the component that is the source/destination of the connection
|
||||
// has not yet been added.
|
||||
final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
|
||||
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
|
||||
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
|
||||
final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
|
||||
|
||||
for (final VersionedConnection proposedConnection : proposed.getConnections()) {
|
||||
connectionsRemoved.remove(proposedConnection.getIdentifier());
|
||||
}
|
||||
|
||||
// Connections must be the first thing to remove, not the last. Otherwise, we will fail
|
||||
// to remove a component if it has a connection going to it!
|
||||
for (final String removedVersionedId : connectionsRemoved) {
|
||||
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
|
||||
LOG.info("Removing {} from {}", connection, group);
|
||||
group.removeConnection(connection);
|
||||
flowManager.onConnectionRemoved(connection);
|
||||
}
|
||||
|
||||
|
||||
// Child groups
|
||||
final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
|
||||
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
|
||||
|
@ -4028,12 +4050,7 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
|
||||
|
||||
// Connections
|
||||
final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
|
||||
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
|
||||
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
|
||||
final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
|
||||
|
||||
// Add and update Connections
|
||||
for (final VersionedConnection proposedConnection : proposed.getConnections()) {
|
||||
final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
|
||||
if (connection == null) {
|
||||
|
@ -4047,21 +4064,10 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
updateConnection(connection, proposedConnection);
|
||||
LOG.info("Updated {}", connection);
|
||||
}
|
||||
|
||||
connectionsRemoved.remove(proposedConnection.getIdentifier());
|
||||
}
|
||||
|
||||
// Remove components that exist in the local flow but not the remote flow.
|
||||
|
||||
// Connections must be the first thing to remove, not the last. Otherwise, we will fail
|
||||
// to remove a component if it has a connection going to it!
|
||||
for (final String removedVersionedId : connectionsRemoved) {
|
||||
final Connection connection = connectionsByVersionedId.get(removedVersionedId);
|
||||
LOG.info("Removing {} from {}", connection, group);
|
||||
group.removeConnection(connection);
|
||||
flowManager.onConnectionRemoved(connection);
|
||||
}
|
||||
|
||||
// Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
|
||||
// We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
|
||||
// then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
|
||||
|
|
Loading…
Reference in New Issue