mirror of https://github.com/apache/nifi.git
NIFI-6985 Use correct versioned parameter contexts when child process groups are version controlled
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #3962.
This commit is contained in:
parent
8d214f99a4
commit
08d0352ac1
|
@ -128,6 +128,7 @@ import org.apache.nifi.util.FlowDifferenceFilters;
|
|||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.ReflectionUtils;
|
||||
import org.apache.nifi.util.SnippetUtils;
|
||||
import org.apache.nifi.web.ResourceNotFoundException;
|
||||
import org.apache.nifi.web.Revision;
|
||||
import org.apache.nifi.web.api.dto.TemplateDTO;
|
||||
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
|
||||
|
@ -3781,14 +3782,21 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
|
||||
final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
|
||||
|
||||
// if there is a nested process group that is versioned controlled, make sure get the param contexts that go with that snapshot
|
||||
// instead of the ones from the parent which would have been passed in to this method
|
||||
Map<String, VersionedParameterContext> childParameterContexts = versionedParameterContexts;
|
||||
if (childCoordinates != null && updateDescendantVersionedGroups) {
|
||||
childParameterContexts = getVersionedParameterContexts(childCoordinates);
|
||||
}
|
||||
|
||||
if (childGroup == null) {
|
||||
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip, versionedParameterContexts);
|
||||
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip, childParameterContexts);
|
||||
flowManager.onProcessGroupAdded(added);
|
||||
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
|
||||
LOG.info("Added {} to {}", added, this);
|
||||
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
|
||||
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, true, updateDescendantVersionedGroups,
|
||||
variablesToSkip, versionedParameterContexts);
|
||||
variablesToSkip, childParameterContexts);
|
||||
LOG.info("Updated {}", childGroup);
|
||||
}
|
||||
|
||||
|
@ -4057,6 +4065,35 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String,VersionedParameterContext> getVersionedParameterContexts(final VersionedFlowCoordinates versionedFlowCoordinates) {
|
||||
final FlowRegistryClient flowRegistryClient = flowController.getFlowRegistryClient();
|
||||
|
||||
final String registryId = flowRegistryClient.getFlowRegistryId(versionedFlowCoordinates.getRegistryUrl());
|
||||
if (registryId == null) {
|
||||
throw new ResourceNotFoundException("Could not find any Flow Registry registered with url: " + versionedFlowCoordinates.getRegistryUrl());
|
||||
}
|
||||
|
||||
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
|
||||
if (flowRegistry == null) {
|
||||
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
|
||||
}
|
||||
|
||||
final String bucketId = versionedFlowCoordinates.getBucketId();
|
||||
final String flowId = versionedFlowCoordinates.getFlowId();
|
||||
final int flowVersion = versionedFlowCoordinates.getVersion();
|
||||
|
||||
try {
|
||||
final VersionedFlowSnapshot childSnapshot = flowRegistry.getFlowContents(bucketId, flowId, flowVersion, false);
|
||||
return childSnapshot.getParameterContexts();
|
||||
} catch (final NiFiRegistryException e) {
|
||||
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
|
||||
+ bucketId + ", Flow " + flowId + ", Version " + flowVersion);
|
||||
} catch (final IOException ioe) {
|
||||
throw new IllegalStateException(
|
||||
"Failed to communicate with Flow Registry when attempting to retrieve a versioned flow");
|
||||
}
|
||||
}
|
||||
|
||||
private ParameterContext createParameterContext(final VersionedParameterContext versionedParameterContext, final String parameterContextId) {
|
||||
final Map<String, Parameter> parameters = new HashMap<>();
|
||||
for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
|
||||
|
|
Loading…
Reference in New Issue