mirror of https://github.com/apache/nifi.git
NIFI-5950: Use temp port names during flow updates
Use temporary values for port names when updated process groups as part of a Change Flow Version operation. This avoids the potential for a name conflict between a ports during the update process. Add a final step to the update process group logic to set the final name on all ports. This closes #3301.
This commit is contained in:
parent
3d408f2b30
commit
a940ff6343
|
@ -1877,7 +1877,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
|
||||
for (final Port port : retriever.getPorts(group)) {
|
||||
if (port.getName().equals(name)) {
|
||||
|
@ -3442,6 +3441,15 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
|
||||
final Set<String> variablesToSkip) throws ProcessorInstantiationException {
|
||||
|
||||
// During the flow update, we will use temporary names for process group ports. This is because port names must be
|
||||
// unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
|
||||
// For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
|
||||
// A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
|
||||
// same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
|
||||
// during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
|
||||
final Map<String, String> proposedInputPortNamesByPortId = new HashMap<>();
|
||||
final Map<String, String> proposedOutputPortNamesByPortId = new HashMap<>();
|
||||
|
||||
group.setComments(proposed.getComments());
|
||||
|
||||
if (updateName) {
|
||||
|
@ -3590,11 +3598,15 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
for (final VersionedPort proposedPort : proposed.getInputPorts()) {
|
||||
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
|
||||
if (port == null) {
|
||||
final Port added = addInputPort(group, proposedPort, componentIdSeed);
|
||||
final String temporaryName = generateTemporaryPortName(proposedPort);
|
||||
final Port added = addInputPort(group, proposedPort, componentIdSeed, temporaryName);
|
||||
proposedInputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
|
||||
flowManager.onInputPortAdded(added);
|
||||
LOG.info("Added {} to {}", added, this);
|
||||
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
||||
updatePort(port, proposedPort);
|
||||
final String temporaryName = generateTemporaryPortName(proposedPort);
|
||||
proposedInputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
|
||||
updatePort(port, proposedPort, temporaryName);
|
||||
LOG.info("Updated {}", port);
|
||||
} else {
|
||||
port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
|
||||
|
@ -3611,11 +3623,15 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
|
||||
final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
|
||||
if (port == null) {
|
||||
final Port added = addOutputPort(group, proposedPort, componentIdSeed);
|
||||
final String temporaryName = generateTemporaryPortName(proposedPort);
|
||||
final Port added = addOutputPort(group, proposedPort, componentIdSeed, temporaryName);
|
||||
proposedOutputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
|
||||
flowManager.onOutputPortAdded(added);
|
||||
LOG.info("Added {} to {}", added, this);
|
||||
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
|
||||
updatePort(port, proposedPort);
|
||||
final String temporaryName = generateTemporaryPortName(proposedPort);
|
||||
proposedOutputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
|
||||
updatePort(port, proposedPort, temporaryName);
|
||||
LOG.info("Updated {}", port);
|
||||
} else {
|
||||
port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
|
||||
|
@ -3771,6 +3787,31 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
group.removeOutputPort(port);
|
||||
}
|
||||
|
||||
// Now that all input/output ports have been removed, we should be able to update
|
||||
// all ports to the final name that was proposed in the new flow version.
|
||||
for (final Map.Entry<String, String> idAndName : proposedInputPortNamesByPortId.entrySet()) {
|
||||
final String portId = idAndName.getKey();
|
||||
final String portFinalName = idAndName.getValue();
|
||||
final Port port = getInputPort(portId);
|
||||
if (port == null) {
|
||||
LOG.warn("Expected to find input port with id={} but it was missing.", portId);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Updating {} to replace temporary name with final name", port);
|
||||
updatePortToSetFinalName(port, portFinalName);
|
||||
}
|
||||
for (final Map.Entry<String, String> idAndName : proposedOutputPortNamesByPortId.entrySet()) {
|
||||
final String portId = idAndName.getKey();
|
||||
final String portFinalName = idAndName.getValue();
|
||||
final Port port = getOutputPort(portId);
|
||||
if (port == null) {
|
||||
LOG.warn("Expected to find output port with id={} but it was missing.", portId);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Updating {} to replace temporary name with final name", port);
|
||||
updatePortToSetFinalName(port, portFinalName);
|
||||
}
|
||||
|
||||
for (final String removedVersionedId : labelsRemoved) {
|
||||
final Label label = labelsByVersionedId.get(removedVersionedId);
|
||||
LOG.info("Removing {} from {}", label, group);
|
||||
|
@ -3810,6 +3851,21 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return true;
|
||||
}
|
||||
|
||||
private String generateTemporaryPortName(final VersionedPort proposedPort) {
|
||||
final String versionedPortId = proposedPort.getIdentifier();
|
||||
final String proposedPortFinalName = proposedPort.getName();
|
||||
return proposedPortFinalName + " (" + versionedPortId + ")";
|
||||
}
|
||||
|
||||
private void updatePortToSetFinalName(final Port port, final String name) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
port.setName(name);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
|
||||
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
|
||||
|
||||
|
@ -4126,26 +4182,29 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return funnel;
|
||||
}
|
||||
|
||||
private void updatePort(final Port port, final VersionedPort proposed) {
|
||||
private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
|
||||
final String name = temporaryName != null ? temporaryName : proposed.getName();
|
||||
port.setComments(proposed.getComments());
|
||||
port.setName(proposed.getName());
|
||||
port.setName(name);
|
||||
port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
|
||||
}
|
||||
|
||||
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
|
||||
final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
|
||||
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
|
||||
final String name = temporaryName != null ? temporaryName : proposed.getName();
|
||||
final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
|
||||
port.setVersionedComponentId(proposed.getIdentifier());
|
||||
destination.addInputPort(port);
|
||||
updatePort(port, proposed);
|
||||
updatePort(port, proposed, temporaryName);
|
||||
|
||||
return port;
|
||||
}
|
||||
|
||||
private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
|
||||
final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
|
||||
private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
|
||||
final String name = temporaryName != null ? temporaryName : proposed.getName();
|
||||
final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
|
||||
port.setVersionedComponentId(proposed.getIdentifier());
|
||||
destination.addOutputPort(port);
|
||||
updatePort(port, proposed);
|
||||
updatePort(port, proposed, temporaryName);
|
||||
|
||||
return port;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue