NIFI-11118: This closes #6909. When changing version of a versioned flow, ensure that we properly set the Version Control Information's StorageLocation for the NiFi Registry client in order to maintain backward compatibility

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-01-31 11:00:55 -05:00 committed by Joe Witt
parent b3319c7119
commit 28b9f6fc02
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
1 changed files with 27 additions and 3 deletions

View File

@ -19,6 +19,7 @@ package org.apache.nifi.registry.flow.mapping;
import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceCardinality;
@ -169,7 +170,9 @@ public class NiFiRegistryFlowMapper {
} }
coordinates.setRegistryUrl(getRegistryUrl(registry)); coordinates.setRegistryUrl(getRegistryUrl(registry));
coordinates.setStorageLocation(versionControlInfo.getStorageLocation() == null ?getRegistryUrl(registry) : versionControlInfo.getStorageLocation());
final String storageLocation = determineStorageLocation(registry, versionControlInfo);
coordinates.setStorageLocation(storageLocation);
coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
coordinates.setVersion(versionControlInfo.getVersion()); coordinates.setVersion(versionControlInfo.getVersion());
@ -193,12 +196,33 @@ public class NiFiRegistryFlowMapper {
} }
private boolean isNiFiRegistryClient(final FlowRegistryClientNode clientNode) {
return clientNode.getComponentType().endsWith("NifiRegistryFlowRegistryClient");
}
// This is specific for the {@code NifiRegistryFlowRegistryClient}, purely for backward compatibility // This is specific for the {@code NifiRegistryFlowRegistryClient}, purely for backward compatibility
private String getRegistryUrl(final FlowRegistryClientNode registry) { private String getRegistryUrl(final FlowRegistryClientNode registry) {
return registry.getComponentType().endsWith("NifiRegistryFlowRegistryClient") return isNiFiRegistryClient(registry) ? registry.getRawPropertyValue(registry.getPropertyDescriptor(REGISTRY_URL_DESCRIPTOR_NAME)) : "";
? registry.getRawPropertyValue(registry.getPropertyDescriptor(REGISTRY_URL_DESCRIPTOR_NAME)) : "";
} }
private String determineStorageLocation(final FlowRegistryClientNode registryClient, final VersionControlInformation versionControlInformation) {
final String explicitStorageLocation = versionControlInformation.getStorageLocation();
if (!StringUtils.isEmpty(explicitStorageLocation)) {
return explicitStorageLocation;
}
final String registryUrl = getRegistryUrl(registryClient);
if (StringUtils.isEmpty(registryUrl)) {
return "";
}
final String bucketId = versionControlInformation.getBucketIdentifier();
final String flowId = versionControlInformation.getFlowIdentifier();
final int version = versionControlInformation.getVersion();
return String.format("%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s", registryUrl, bucketId, flowId, version);
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider,
final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo) { final BiFunction<ProcessGroup, VersionedProcessGroup, Boolean> applyVersionControlInfo) {
final Set<String> allIncludedGroupsIds = group.findAllProcessGroups().stream() final Set<String> allIncludedGroupsIds = group.findAllProcessGroups().stream()