NIFI-10371: When a component is moved between groups, ensure that its versioned component id is unique within the destination group. Also ensure that when adding a connection to a PG with the VersionedComponentSynchronizer that we prefer obtaining source/destination by instance id instead of versioned id.

Fixed bug where ProcessGroup would inadvertently set the wrong component's Versioned Component ID to null when there was an ID conflict

This closes #6314

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-08-17 17:57:35 -04:00 committed by exceptionfactory
parent d2dbaa3c62
commit b012e9aad2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 133 additions and 4 deletions

View File

@ -3072,7 +3072,24 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent) {
final String id = connectableComponent.getId();
// Always prefer the instance identifier, if it's available.
final Connectable connectable = getConnectable(group, connectableComponent, ConnectableComponent::getInstanceIdentifier);
if (connectable != null) {
LOG.debug("Found Connectable {} in Process Group {} by Instance ID {}", connectable, group, connectableComponent.getInstanceIdentifier());
return connectable;
}
// If we're synchronizing and the component is not available by the instance ID, lookup the component by the ID instead.
final Connectable connectableById = getConnectable(group, connectableComponent, ConnectableComponent::getId);
LOG.debug("Found no connectable in Process Group {} by Instance ID. Lookup by ID {} yielded {}", connectable, connectableComponent.getId(), connectableById);
return connectableById;
}
private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent, final Function<ConnectableComponent, String> idFunction) {
final String id = idFunction.apply(connectableComponent);
if (id == null) {
return null;
}
switch (connectableComponent.getType()) {
case FUNNEL:

View File

@ -612,6 +612,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
// Unique port check within the same group.
verifyPortUniqueness(port, inputPorts, this::getInputPortByName);
ensureUniqueVersionControlId(port, getInputPorts());
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
@ -695,6 +696,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
// Unique port check within the same group.
verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);
ensureUniqueVersionControlId(port, getOutputPorts());
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
@ -770,6 +772,8 @@ public final class StandardProcessGroup implements ProcessGroup {
writeLock.lock();
try {
ensureUniqueVersionControlId(group, getProcessGroups());
group.setParent(this);
group.getVariableRegistry().setParent(getVariableRegistry());
@ -877,6 +881,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
}
ensureUniqueVersionControlId(remoteGroup, getRemoteProcessGroups());
remoteGroup.setProcessGroup(this);
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
onComponentModified();
@ -958,6 +963,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
}
ensureUniqueVersionControlId(processor, getProcessors());
processor.setProcessGroup(this);
processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
@ -971,6 +978,50 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
/**
* A component's Versioned Component ID is used to link a component on the canvas to a component in a versioned flow.
* There may, however, be multiple instances of the same versioned flow in a single NiFi instance. In this case, we will have
* multiple components with the same Versioned Component ID. This is acceptable as long as no two components within the same Process Group
* have the same Versioned Component ID. However, it is not acceptable to have two components within the same Process Group that have the same
* Versioned Component ID. If this happens, we will have no way to know which component in our flow maps to which component in the versioned flow.
* We don't have an issue with this when a flow is imported, etc. because it is always imported to a new Process Group. However, because it's possible
* to move most components between groups, we can have a situation in which a component is moved to a higher group, and that can result in a conflict.
* In such a case, we handle this by nulling out the Versioned Component ID if there is a conflict. This essentially makes NiFi behave as if a component
* is copied & pasted instead of being moved whenever a conflict occurs.
*
* @param component the component whose Versioned Component ID should be nulled if there's a conflict
* @param componentsToCheck the components to check to determine if there's a conflict
*/
private void ensureUniqueVersionControlId(final org.apache.nifi.components.VersionedComponent component,
final Collection<? extends org.apache.nifi.components.VersionedComponent> componentsToCheck) {
final Optional<String> optionalVersionControlId = component.getVersionedComponentId();
if (!optionalVersionControlId.isPresent()) {
return;
}
final String versionControlId = optionalVersionControlId.get();
final boolean duplicateId = containsVersionedComponentId(componentsToCheck, versionControlId);
if (duplicateId) {
LOG.debug("Adding {} to {}, found conflicting Version Component ID {} so marking Version Component ID of {} as null", component, this, versionControlId, component);
component.setVersionedComponentId(null);
} else {
LOG.debug("Adding {} to {}, found no conflicting Version Component ID for ID {}", component, this, versionControlId);
}
}
private boolean containsVersionedComponentId(final Collection<? extends org.apache.nifi.components.VersionedComponent> components, final String id) {
for (final org.apache.nifi.components.VersionedComponent component : components) {
final Optional<String> optionalConnectableId = component.getVersionedComponentId();
if (optionalConnectableId.isPresent() && Objects.equals(optionalConnectableId.get(), id)) {
return true;
}
}
return false;
}
/**
* Looks for any property that is configured on the given component that references a Controller Service.
* If any exists, and that Controller Service is not accessible from this Process Group, then the given
@ -1185,6 +1236,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
ensureUniqueVersionControlId(connection, getConnections());
connection.setProcessGroup(this);
source.addConnection(connection);
if (source != destination) { // don't call addConnection twice if it's a self-looping connection.
@ -1401,6 +1453,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
}
ensureUniqueVersionControlId(label, getLabels());
label.setProcessGroup(this);
labels.put(label.getIdentifier(), label);
onComponentModified();
@ -2151,6 +2204,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
}
ensureUniqueVersionControlId(funnel, getFunnels());
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
flowManager.onFunnelAdded(funnel);

View File

@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -167,6 +168,15 @@ public abstract class AbstractPort implements Port {
@Override
public void setProcessGroup(final ProcessGroup newGroup) {
if (this.processGroup.get() != null && !Objects.equals(newGroup, this.processGroup.get())) {
// Process Group is changing. For a Port, we effectively want to consider this the same as
// deleting an old port and creating a new one, in terms of tracking the port to a versioned flow.
// This ensures that we have a unique versioned component id not only in the given process group but also
// between the given group and its parent and all children. This is important for ports because we can
// connect to/from them between Process Groups, so we need to ensure unique IDs.
versionedComponentId.set(null);
}
this.processGroup.set(newGroup);
}

View File

@ -409,7 +409,9 @@ public class FrameworkIntegrationTest {
final String uuid = getSimpleTypeName(processorType) + "-" + UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ProcessorNode procNode = flowController.getFlowManager().createProcessor(processorType, uuid, bundleCoordinate, Collections.emptySet(), true, true, null);
destination.addProcessor(procNode);
if (destination != null) {
destination.addProcessor(procNode);
}
return procNode;
}

View File

@ -21,17 +21,20 @@ import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.FrameworkIntegrationTest;
import org.apache.nifi.integration.processor.BiConsumerProcessor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.Revision;
import org.junit.Assert;
import org.junit.Test;
@ -42,8 +45,50 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
@Test
public void testConflictingVersionedComponentId() {
final ProcessorNode proc1 = createProcessorNode(BiConsumerProcessor.class, null);
getRootGroup().addProcessor(proc1);
final ProcessorNode proc2 = createProcessorNode(BiConsumerProcessor.class, null);
proc2.setVersionedComponentId("aaa");
getRootGroup().addProcessor(proc2);
// Ensure that id didn't change
assertEquals("aaa", proc2.getVersionedComponentId().get());
final ProcessorNode proc3 = createProcessorNode(BiConsumerProcessor.class, null);
proc3.setVersionedComponentId("bbb");
getRootGroup().addProcessor(proc3);
assertEquals("bbb", proc3.getVersionedComponentId().get());
final ProcessorNode proc4 = createProcessorNode(BiConsumerProcessor.class, null);
proc4.setVersionedComponentId("bbb");
getRootGroup().addProcessor(proc4);
// Ensure that versioned component id was nulled out
assertFalse(proc4.getVersionedComponentId().isPresent());
final ProcessGroup childGroup = getFlowController().getFlowManager().createProcessGroup("child");
childGroup.setName("child");
getRootGroup().addProcessGroup(childGroup);
final ProcessorNode proc5 = createProcessorNode(BiConsumerProcessor.class, null);
proc5.setVersionedComponentId("bbb");
childGroup.addProcessor(proc5);
assertEquals("bbb", proc5.getVersionedComponentId().get());
// Move processor from child group to parent group.
// This should null out the ID for proc5 and leave proc3 as is.
final StandardSnippet snippet = new StandardSnippet();
snippet.addProcessors(Collections.singletonMap(proc5.getIdentifier(), new Revision(0L, "abc", proc5.getIdentifier())));
childGroup.move(snippet, getRootGroup());
assertFalse(proc5.getVersionedComponentId().isPresent());
assertEquals("bbb", proc3.getVersionedComponentId().get());
}
@Test
public void testProcessGroupDefaults() {
// Connect two processors with default settings of the root process group

View File

@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
# Provenance Repository Properties
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
nifi.provenance.repository.debug.frequency=1_000_000
nifi.provenance.repository.debug.frequency=1000000
# Persistent Provenance Repository Properties
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository

View File

@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
# Provenance Repository Properties
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
nifi.provenance.repository.debug.frequency=1_000_000
nifi.provenance.repository.debug.frequency=1000000
# Persistent Provenance Repository Properties
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository