From 72660af479d726061eb76854398cb88ae4f25c30 Mon Sep 17 00:00:00 2001 From: timeabarna <38653567+timeabarna@users.noreply.github.com> Date: Mon, 4 Oct 2021 09:14:36 +0200 Subject: [PATCH] NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel (#5402) * NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel * NIFI-9229 Addressing review comments modified log message and added comments --- .../nifi/groups/StandardProcessGroup.java | 37 ++- .../integration/versioned/ImportFlowIT.java | 248 ++++++++++++++++++ 2 files changed, 281 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index f933cf7201..9c8d26454f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -158,6 +158,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -4293,16 +4294,30 @@ public final class StandardProcessGroup implements ProcessGroup { //As Input Port (IP1) originally belonged to PGA the new connection would be incorrectly linked to the old Input Port //instead of the one being in PGB, so it needs to be removed first before updating the connections. - for (final String removedVersionedId : inputPortsRemoved) { + Iterator inputPortsRemovedIterator = inputPortsRemoved.iterator(); + while (inputPortsRemovedIterator.hasNext()) { + final String removedVersionedId = inputPortsRemovedIterator.next(); final Port port = inputPortsByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", port, group); - group.removeInputPort(port); + try { + group.removeInputPort(port); + inputPortsRemovedIterator.remove(); + } catch (IllegalStateException e) { + LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group); + } } - for (final String removedVersionedId : outputPortsRemoved) { + Iterator outputPortsRemovedIterator = outputPortsRemoved.iterator(); + while (outputPortsRemovedIterator.hasNext()) { + final String removedVersionedId = outputPortsRemovedIterator.next(); final Port port = outputPortsByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", port, group); - group.removeOutputPort(port); + try { + group.removeOutputPort(port); + outputPortsRemovedIterator.remove(); + } catch (IllegalStateException e) { + LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group); + } } // Add and update Connections @@ -4343,6 +4358,20 @@ public final class StandardProcessGroup implements ProcessGroup { group.removeFunnel(funnel); } + //Removing remaining input ports + for (final String removedVersionedId : inputPortsRemoved) { + final Port port = inputPortsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", port, group); + group.removeInputPort(port); + } + + //Removing remaining output ports + for (final String removedVersionedId : outputPortsRemoved) { + final Port port = outputPortsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", port, group); + group.removeOutputPort(port); + } + // Now that all input/output ports have been removed, we should be able to update // all ports to the final name that was proposed in the new flow version. for (final Map.Entry portAndFinalName : proposedPortFinalNames.entrySet()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java index c4a00b34fc..cdf7a65fb6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java @@ -20,12 +20,14 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.StandardSnippet; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.integration.DirectInjectionExtensionManager; @@ -443,6 +445,222 @@ public class ImportFlowIT extends FrameworkIntegrationTest { assertTrue(groupA.getInputPorts().isEmpty()); } + @Test + public void testUpdateFlowWithOutputPortChangedToFunnelInAConnection() { + //Testing use case NIFI-9229 + //Create Process Group + final ProcessGroup group = createProcessGroup("p-group-id", "P Group", getRootGroup()); + + //Create Processor under Process Group + final ProcessorNode processor = createProcessorNode(GenerateProcessor.class, group); + + //Add Output Port to Process Group + final Port port = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port"); + group.addOutputPort(port); + + //Create Connection between Processor and Input Port + final Connection connection = connect(group, processor, port, processor.getRelationships()); + + //Create a snapshot + final VersionedFlowSnapshot version1 = createFlowSnapshot(group); + + //Create Funnel under Process Group + Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id"); + group.addFunnel(funnel); + + //Modify connection's destination from Output Port to Funnel + connection.setDestination(funnel); + + //Delete Output Port + group.removeOutputPort(port); + + //Create another snapshot + final VersionedFlowSnapshot version2 = createFlowSnapshot(group); + + //Change Process Group version to Version 1 + group.updateFlow(version1, null, false, true, true); + + //Process Group should have only one Output Port, One Processor and One connection + assertEquals(1, group.getProcessors().size()); + assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId()); + assertEquals(1, group.getConnections().size()); + assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertEquals(1, group.getOutputPorts().size()); + assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId()); + assertTrue(group.getFunnels().isEmpty()); + assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId()); + + //Change Process Group version to Version 2 + group.updateFlow(version2, null, false, true, true); + + //Process Group should have a Funnel, a Processor, a Connection and no Output Ports + assertTrue(group.getOutputPorts().isEmpty()); + assertEquals(1, group.getProcessors().size()); + assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId()); + assertEquals(1, group.getConnections().size()); + assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertEquals(1, group.getFunnels().size()); + assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId()); + assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId()); + } + + @Test + public void testUpdateFlowWithModifyingConnectionDeletingAndMovingPort() { + //Create Process Group A + final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup()); + + //Create Process Group B under Process Group A + final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA); + + //Add Input port under Process Group B + final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port"); + groupB.addInputPort(inputPort); + + //Add Processor 1 under Process Group A + final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA); + + //Add Processor 2 under Process Group A + final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA); + + //Add Output Port under Process Group A + final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port"); + groupA.addOutputPort(outputPort); + + //Connect Processor 1 and Output Port as Connection 1 + final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships()); + + //Connect Processor 1 and Input Port as Connection 2 + final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships()); + + //Create a snapshot + final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA); + + //Modify Connection 1 to point to Processor 2 + connection1.setDestination(processor2); + + //Move Output Port to Process Group B + moveOutputPort(outputPort, groupB); + + //Create another snapshot + final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA); + + //Delete connection 2 + groupA.removeConnection(connection2); + + //Delete Input Port + groupB.removeInputPort(inputPort); + + //Create another snapshot + final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA); + + //Change Process Group version to Version 1 + groupA.updateFlow(version1, null, false, true, true); + + //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port + assertEquals(2, groupA.getProcessors().size()); + assertEquals(2, groupA.getConnections().size()); + assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId()); + assertEquals(1, groupA.getOutputPorts().size()); + assertEquals(1, groupA.getProcessGroups().size()); + assertEquals(1, groupB.getInputPorts().size()); + + //Change Process Group version to Version 2 + groupA.updateFlow(version2, null, false, true, true); + + //Connection1 destination changed to Processor2 and Output Port moved to Process Group B + assertTrue(groupA.getOutputPorts().isEmpty()); + assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId()); + assertEquals(1, groupB.getOutputPorts().size()); + assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId()); + + //Change Process Group version to Version 3 + groupA.updateFlow(version3, null, false, true, true); + + //Connection2 and Input Port should be deleted + assertEquals(1, groupA.getConnections().size()); + assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertTrue(groupB.getInputPorts().isEmpty()); + } + + @Test + public void testUpdateFlowWithDeletingConnectionDeletingAndMovingPort() { + //Create Process Group A + final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup()); + + //Create Process Group B under Process Group A + final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA); + + //Add Input port under Process Group B + final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port"); + groupB.addInputPort(inputPort); + + //Add Processor 1 under Process Group A + final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA); + + //Add Processor 2 under Process Group A + final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA); + + //Add Output Port under Process Group A + final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port"); + groupA.addOutputPort(outputPort); + + //Connect Processor 1 and Output Port as Connection 1 + final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships()); + + //Connect Processor 1 and Input Port as Connection 2 + final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships()); + + //Create a snapshot + final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA); + + //Modify Connection 1 to point to Processor 2 + connection1.setDestination(processor2); + + //Delete Output Port + groupA.removeOutputPort(outputPort); + + //Create another snapshot + final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA); + + //Delete connection 2 + groupA.removeConnection(connection2); + + //Move Input Port to Process Group A + moveInputPort(inputPort, groupA); + + //Create another snapshot + final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA); + + //Change Process Group version to Version 1 + groupA.updateFlow(version1, null, false, true, true); + + //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port + assertEquals(2, groupA.getProcessors().size()); + assertEquals(2, groupA.getConnections().size()); + assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId()); + assertEquals(1, groupA.getOutputPorts().size()); + assertEquals(1, groupA.getProcessGroups().size()); + assertEquals(1, groupB.getInputPorts().size()); + + //Change Process Group version to Version 2 + groupA.updateFlow(version2, null, false, true, true); + + //Connection1 destination changed to Processor2 and Output Port deleted + assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId()); + assertTrue(groupA.getOutputPorts().isEmpty()); + assertTrue(groupB.getOutputPorts().isEmpty()); + + //Change Process Group version to Version 3 + groupA.updateFlow(version3, null, false, true, true); + + //Connection2 should be deleted and Input Port moved to Process Group A + assertEquals(1, groupA.getConnections().size()); + assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId()); + assertTrue(groupB.getInputPorts().isEmpty()); + assertEquals(1, groupA.getInputPorts().size()); + assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId()); + } + private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) { final ProcessGroup group = getFlowController().getFlowManager().createProcessGroup(groupId); group.setName(groupName); @@ -458,6 +676,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest { port.getProcessGroup().move(snippet, destination); } + private void moveOutputPort(final Port port, final ProcessGroup destination) { + final StandardSnippet snippet = new StandardSnippet(); + snippet.setParentGroupId(port.getProcessGroupIdentifier()); + snippet.addOutputPorts(Collections.singletonMap(port.getIdentifier(), null)); + + port.getProcessGroup().move(snippet, destination); + } + private Set getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) { final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager()); @@ -495,6 +721,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final List processorNodes; final List controllerServiceNodes; final List inputPorts; + final List outputPorts; + final List funnels; final List connections; final List processGroups; final Set versionedProcessGroups; @@ -503,12 +731,16 @@ public class ImportFlowIT extends FrameworkIntegrationTest { processorNodes = processors; controllerServiceNodes = controllerServices; inputPorts = Collections.EMPTY_LIST; + outputPorts = Collections.EMPTY_LIST; + funnels = Collections.EMPTY_LIST; connections = Collections.EMPTY_LIST; versionedProcessGroups = Collections.EMPTY_SET; } else { processorNodes = new ArrayList<>(group.getProcessors()); controllerServiceNodes = new ArrayList<>(group.getControllerServices(false)); inputPorts = new ArrayList<>(group.getInputPorts()); + outputPorts = new ArrayList<>(group.getOutputPorts()); + funnels = new ArrayList<>(group.getFunnels()); connections = new ArrayList<>(group.getConnections()); processGroups = new ArrayList<>(group.getProcessGroups()); @@ -541,6 +773,20 @@ public class ImportFlowIT extends FrameworkIntegrationTest { inputPort.setVersionedComponentId(versionedInputPort.getIdentifier()); } + final Set versionedOutputPorts = new HashSet<>(); + for (final Port outputPort : outputPorts) { + final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort); + versionedOutputPorts.add(versionedOutputPort); + outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier()); + } + + final Set versionedFunnels = new HashSet<>(); + for (final Funnel funnel : funnels) { + final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel); + versionedFunnels.add(versionedFunnel); + funnel.setVersionedComponentId(versionedFunnel.getIdentifier()); + } + final Set versionedConnections = new HashSet<>(); for (final Connection connection : connections) { final VersionedConnection versionedConnection = flowMapper.mapConnection(connection); @@ -553,6 +799,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { flowContents.setControllerServices(versionedServices); flowContents.setProcessGroups(versionedProcessGroups); flowContents.setInputPorts(versionedInputPorts); + flowContents.setOutputPorts(versionedOutputPorts); + flowContents.setFunnels(versionedFunnels); flowContents.setConnections(versionedConnections); final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);