NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel (#5402)

* NIFI-9229 Flow upgrade not possible if a Output Port changes to a funnel
* NIFI-9229 Addressing review comments modified log message and added comments
This commit is contained in:
timeabarna 2021-10-04 09:14:36 +02:00 committed by GitHub
parent cacd6bb88a
commit 72660af479
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 281 additions and 4 deletions

View File

@ -158,6 +158,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -4293,16 +4294,30 @@ public final class StandardProcessGroup implements ProcessGroup {
//As Input Port (IP1) originally belonged to PGA the new connection would be incorrectly linked to the old Input Port
//instead of the one being in PGB, so it needs to be removed first before updating the connections.
for (final String removedVersionedId : inputPortsRemoved) {
Iterator<String> inputPortsRemovedIterator = inputPortsRemoved.iterator();
while (inputPortsRemovedIterator.hasNext()) {
final String removedVersionedId = inputPortsRemovedIterator.next();
final Port port = inputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
group.removeInputPort(port);
try {
group.removeInputPort(port);
inputPortsRemovedIterator.remove();
} catch (IllegalStateException e) {
LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
}
}
for (final String removedVersionedId : outputPortsRemoved) {
Iterator<String> outputPortsRemovedIterator = outputPortsRemoved.iterator();
while (outputPortsRemovedIterator.hasNext()) {
final String removedVersionedId = outputPortsRemovedIterator.next();
final Port port = outputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
group.removeOutputPort(port);
try {
group.removeOutputPort(port);
outputPortsRemovedIterator.remove();
} catch (IllegalStateException e) {
LOG.info("Removing {} from {} not possible at the moment, will try again after updated the connections.", port, group);
}
}
// Add and update Connections
@ -4343,6 +4358,20 @@ public final class StandardProcessGroup implements ProcessGroup {
group.removeFunnel(funnel);
}
//Removing remaining input ports
for (final String removedVersionedId : inputPortsRemoved) {
final Port port = inputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
group.removeInputPort(port);
}
//Removing remaining output ports
for (final String removedVersionedId : outputPortsRemoved) {
final Port port = outputPortsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", port, group);
group.removeOutputPort(port);
}
// Now that all input/output ports have been removed, we should be able to update
// all ports to the final name that was proposed in the new flow version.
for (final Map.Entry<Port, String> portAndFinalName : proposedPortFinalNames.entrySet()) {

View File

@ -20,12 +20,14 @@ import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
@ -443,6 +445,222 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertTrue(groupA.getInputPorts().isEmpty());
}
@Test
public void testUpdateFlowWithOutputPortChangedToFunnelInAConnection() {
//Testing use case NIFI-9229
//Create Process Group
final ProcessGroup group = createProcessGroup("p-group-id", "P Group", getRootGroup());
//Create Processor under Process Group
final ProcessorNode processor = createProcessorNode(GenerateProcessor.class, group);
//Add Output Port to Process Group
final Port port = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
group.addOutputPort(port);
//Create Connection between Processor and Input Port
final Connection connection = connect(group, processor, port, processor.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
//Create Funnel under Process Group
Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id");
group.addFunnel(funnel);
//Modify connection's destination from Output Port to Funnel
connection.setDestination(funnel);
//Delete Output Port
group.removeOutputPort(port);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
//Change Process Group version to Version 1
group.updateFlow(version1, null, false, true, true);
//Process Group should have only one Output Port, One Processor and One connection
assertEquals(1, group.getProcessors().size());
assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
assertEquals(1, group.getConnections().size());
assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
assertEquals(1, group.getOutputPorts().size());
assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
assertTrue(group.getFunnels().isEmpty());
assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId());
//Change Process Group version to Version 2
group.updateFlow(version2, null, false, true, true);
//Process Group should have a Funnel, a Processor, a Connection and no Output Ports
assertTrue(group.getOutputPorts().isEmpty());
assertEquals(1, group.getProcessors().size());
assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
assertEquals(1, group.getConnections().size());
assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
assertEquals(1, group.getFunnels().size());
assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId());
assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId());
}
@Test
public void testUpdateFlowWithModifyingConnectionDeletingAndMovingPort() {
//Create Process Group A
final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
//Create Process Group B under Process Group A
final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
//Add Input port under Process Group B
final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
groupB.addInputPort(inputPort);
//Add Processor 1 under Process Group A
final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Processor 2 under Process Group A
final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Output Port under Process Group A
final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
groupA.addOutputPort(outputPort);
//Connect Processor 1 and Output Port as Connection 1
final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
//Connect Processor 1 and Input Port as Connection 2
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
//Move Output Port to Process Group B
moveOutputPort(outputPort, groupB);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
//Delete Input Port
groupB.removeInputPort(inputPort);
//Create another snapshot
final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
//Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
assertEquals(2, groupA.getProcessors().size());
assertEquals(2, groupA.getConnections().size());
assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
assertEquals(1, groupA.getOutputPorts().size());
assertEquals(1, groupA.getProcessGroups().size());
assertEquals(1, groupB.getInputPorts().size());
//Change Process Group version to Version 2
groupA.updateFlow(version2, null, false, true, true);
//Connection1 destination changed to Processor2 and Output Port moved to Process Group B
assertTrue(groupA.getOutputPorts().isEmpty());
assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
assertEquals(1, groupB.getOutputPorts().size());
assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
//Change Process Group version to Version 3
groupA.updateFlow(version3, null, false, true, true);
//Connection2 and Input Port should be deleted
assertEquals(1, groupA.getConnections().size());
assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
assertTrue(groupB.getInputPorts().isEmpty());
}
@Test
public void testUpdateFlowWithDeletingConnectionDeletingAndMovingPort() {
//Create Process Group A
final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
//Create Process Group B under Process Group A
final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
//Add Input port under Process Group B
final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
groupB.addInputPort(inputPort);
//Add Processor 1 under Process Group A
final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Processor 2 under Process Group A
final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Output Port under Process Group A
final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
groupA.addOutputPort(outputPort);
//Connect Processor 1 and Output Port as Connection 1
final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
//Connect Processor 1 and Input Port as Connection 2
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
//Delete Output Port
groupA.removeOutputPort(outputPort);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
//Move Input Port to Process Group A
moveInputPort(inputPort, groupA);
//Create another snapshot
final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
//Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
assertEquals(2, groupA.getProcessors().size());
assertEquals(2, groupA.getConnections().size());
assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
assertEquals(1, groupA.getOutputPorts().size());
assertEquals(1, groupA.getProcessGroups().size());
assertEquals(1, groupB.getInputPorts().size());
//Change Process Group version to Version 2
groupA.updateFlow(version2, null, false, true, true);
//Connection1 destination changed to Processor2 and Output Port deleted
assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
assertTrue(groupA.getOutputPorts().isEmpty());
assertTrue(groupB.getOutputPorts().isEmpty());
//Change Process Group version to Version 3
groupA.updateFlow(version3, null, false, true, true);
//Connection2 should be deleted and Input Port moved to Process Group A
assertEquals(1, groupA.getConnections().size());
assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
assertTrue(groupB.getInputPorts().isEmpty());
assertEquals(1, groupA.getInputPorts().size());
assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
}
private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) {
final ProcessGroup group = getFlowController().getFlowManager().createProcessGroup(groupId);
group.setName(groupName);
@ -458,6 +676,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
port.getProcessGroup().move(snippet, destination);
}
private void moveOutputPort(final Port port, final ProcessGroup destination) {
final StandardSnippet snippet = new StandardSnippet();
snippet.setParentGroupId(port.getProcessGroupIdentifier());
snippet.addOutputPorts(Collections.singletonMap(port.getIdentifier(), null));
port.getProcessGroup().move(snippet, destination);
}
private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
@ -495,6 +721,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final List<ProcessorNode> processorNodes;
final List<ControllerServiceNode> controllerServiceNodes;
final List<Port> inputPorts;
final List<Port> outputPorts;
final List<Funnel> funnels;
final List<Connection> connections;
final List<ProcessGroup> processGroups;
final Set<VersionedProcessGroup> versionedProcessGroups;
@ -503,12 +731,16 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processorNodes = processors;
controllerServiceNodes = controllerServices;
inputPorts = Collections.EMPTY_LIST;
outputPorts = Collections.EMPTY_LIST;
funnels = Collections.EMPTY_LIST;
connections = Collections.EMPTY_LIST;
versionedProcessGroups = Collections.EMPTY_SET;
} else {
processorNodes = new ArrayList<>(group.getProcessors());
controllerServiceNodes = new ArrayList<>(group.getControllerServices(false));
inputPorts = new ArrayList<>(group.getInputPorts());
outputPorts = new ArrayList<>(group.getOutputPorts());
funnels = new ArrayList<>(group.getFunnels());
connections = new ArrayList<>(group.getConnections());
processGroups = new ArrayList<>(group.getProcessGroups());
@ -541,6 +773,20 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
}
final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
for (final Port outputPort : outputPorts) {
final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort);
versionedOutputPorts.add(versionedOutputPort);
outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
}
final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
for (final Funnel funnel : funnels) {
final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel);
versionedFunnels.add(versionedFunnel);
funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
}
final Set<VersionedConnection> versionedConnections = new HashSet<>();
for (final Connection connection : connections) {
final VersionedConnection versionedConnection = flowMapper.mapConnection(connection);
@ -553,6 +799,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
flowContents.setControllerServices(versionedServices);
flowContents.setProcessGroups(versionedProcessGroups);
flowContents.setInputPorts(versionedInputPorts);
flowContents.setOutputPorts(versionedOutputPorts);
flowContents.setFunnels(versionedFunnels);
flowContents.setConnections(versionedConnections);
final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);