mirror of https://github.com/apache/nifi.git
NIFI-5695: Fixed bug that caused ports to not properly map to their correct child group on Flow Import if the child group is independently versioned
This closes #3070. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
5eb5e96b16
commit
270ce8570d
|
@ -3933,6 +3933,24 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return port.get();
|
||||
}
|
||||
|
||||
// Attempt to locate child group by versioned component id
|
||||
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
|
||||
.filter(child -> child.getVersionedComponentId().isPresent())
|
||||
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
|
||||
.findFirst();
|
||||
|
||||
if (optionalSpecifiedGroup.isPresent()) {
|
||||
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
|
||||
return specifiedGroup.getInputPorts().stream()
|
||||
.filter(component -> component.getVersionedComponentId().isPresent())
|
||||
.filter(component -> id.equals(component.getVersionedComponentId().get()))
|
||||
.findAny()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
// If no child group matched the versioned component id, then look at all child groups. This is done because
|
||||
// in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
|
||||
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
|
||||
return group.getProcessGroups().stream()
|
||||
.flatMap(gr -> gr.getInputPorts().stream())
|
||||
.filter(component -> component.getVersionedComponentId().isPresent())
|
||||
|
@ -3950,6 +3968,24 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
return port.get();
|
||||
}
|
||||
|
||||
// Attempt to locate child group by versioned component id
|
||||
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
|
||||
.filter(child -> child.getVersionedComponentId().isPresent())
|
||||
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
|
||||
.findFirst();
|
||||
|
||||
if (optionalSpecifiedGroup.isPresent()) {
|
||||
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
|
||||
return specifiedGroup.getOutputPorts().stream()
|
||||
.filter(component -> component.getVersionedComponentId().isPresent())
|
||||
.filter(component -> id.equals(component.getVersionedComponentId().get()))
|
||||
.findAny()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
// If no child group matched the versioned component id, then look at all child groups. This is done because
|
||||
// in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
|
||||
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
|
||||
return group.getProcessGroups().stream()
|
||||
.flatMap(gr -> gr.getOutputPorts().stream())
|
||||
.filter(component -> component.getVersionedComponentId().isPresent())
|
||||
|
|
|
@ -71,6 +71,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
|
@ -228,6 +229,20 @@ public class NiFiRegistryFlowMapper {
|
|||
return versionedId;
|
||||
}
|
||||
|
||||
private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E {
|
||||
if (currentVersionedId.isPresent()) {
|
||||
return currentVersionedId.get();
|
||||
} else {
|
||||
final String resolved = versionedComponentIds.get(componentId);
|
||||
if (resolved == null) {
|
||||
throw exceptionSupplier.get();
|
||||
}
|
||||
|
||||
return resolved;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getGroupId(final String groupId) {
|
||||
return versionedComponentIds.get(groupId);
|
||||
}
|
||||
|
@ -265,39 +280,27 @@ public class NiFiRegistryFlowMapper {
|
|||
public ConnectableComponent mapConnectable(final Connectable connectable) {
|
||||
final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier());
|
||||
|
||||
final Optional<String> versionedId = connectable.getVersionedComponentId();
|
||||
if (versionedId.isPresent()) {
|
||||
component.setId(versionedId.get());
|
||||
} else {
|
||||
final String resolved = versionedComponentIds.get(connectable.getIdentifier());
|
||||
if (resolved == null) {
|
||||
throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component");
|
||||
}
|
||||
|
||||
component.setId(resolved);
|
||||
}
|
||||
final String versionedId = getIdOrThrow(connectable.getVersionedComponentId(), connectable.getIdentifier(),
|
||||
() -> new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component"));
|
||||
component.setId(versionedId);
|
||||
|
||||
component.setComments(connectable.getComments());
|
||||
|
||||
final String groupId;
|
||||
if (connectable instanceof RemoteGroupPort) {
|
||||
final RemoteGroupPort port = (RemoteGroupPort) connectable;
|
||||
final RemoteProcessGroup rpg = port.getRemoteProcessGroup();
|
||||
final Optional<String> rpgVersionedId = rpg.getVersionedComponentId();
|
||||
final String groupId;
|
||||
if (rpgVersionedId.isPresent()) {
|
||||
groupId = rpgVersionedId.get();
|
||||
} else {
|
||||
final String resolved = versionedComponentIds.get(rpg.getIdentifier());
|
||||
if (resolved == null) {
|
||||
throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to");
|
||||
}
|
||||
groupId = getIdOrThrow(rpgVersionedId, rpg.getIdentifier(),
|
||||
() -> new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to"));
|
||||
|
||||
groupId = resolved;
|
||||
}
|
||||
|
||||
component.setGroupId(groupId);
|
||||
} else {
|
||||
component.setGroupId(connectable.getProcessGroupIdentifier());
|
||||
groupId = getIdOrThrow(connectable.getProcessGroup().getVersionedComponentId(), connectable.getProcessGroupIdentifier(),
|
||||
() -> new IllegalArgumentException("Unable to find the Versioned Component ID for the Process Group that " + connectable + " belongs to"));
|
||||
}
|
||||
|
||||
component.setGroupId(groupId);
|
||||
|
||||
component.setName(connectable.getName());
|
||||
component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
|
||||
return component;
|
||||
|
|
Loading…
Reference in New Issue