NIFI-10096 Corrected nested inherited Parameter Context loading

Signed-off-by: Joe Gresock <jgresock@gmail.com>
This closes #6114.
This commit is contained in:
exceptionfactory 2022-06-09 15:54:30 -05:00 committed by Joe Gresock
parent d925989811
commit c51f70b6e6
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
1 changed files with 50 additions and 23 deletions

View File

@ -115,6 +115,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@ -155,7 +156,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
// determine if the controller already had flow sync'd to it
final boolean flowAlreadySynchronized = controller.isFlowSynchronized();
logger.info("Synching FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized);
logger.info("Synchronizing FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized);
// If bundle update strategy is configured to allow for compatible bundles, update any components to use compatible bundles if
// the exact bundle does not exist.
@ -533,50 +534,71 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
}
private void inheritParameterContexts(final FlowController controller, final VersionedDataflow dataflow) {
final ParameterContextManager parameterContextManager = controller.getFlowManager().getParameterContextManager();
// Add any parameter context that doesn't yet exist. We have to add all contexts before updating them because
// one context may reference another. We need that reference to exist before we try to create the reference.
final Map<String, ParameterContext> parameterContextsByName = parameterContextManager.getParameterContextNameMapping();
controller.getFlowManager().withParameterContextResolution(() -> {
for (final VersionedParameterContext versionedParameterContext : dataflow.getParameterContexts()) {
inheritParameterContext(versionedParameterContext, controller.getFlowManager(), parameterContextsByName);
final List<VersionedParameterContext> parameterContexts = dataflow.getParameterContexts();
// Build mapping of name to context for resolution of inherited contexts
final Map<String, VersionedParameterContext> namedParameterContexts = parameterContexts.stream()
.collect(
Collectors.toMap(VersionedParameterContext::getName, Function.identity())
);
for (final VersionedParameterContext versionedParameterContext : parameterContexts) {
inheritParameterContext(versionedParameterContext, controller.getFlowManager(), namedParameterContexts);
}
});
}
private void inheritParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager, final Map<String, ParameterContext> parameterContextsByName) {
final ParameterContext existingContext = parameterContextsByName.get(versionedParameterContext.getName());
private void inheritParameterContext(
final VersionedParameterContext versionedParameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
) {
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
final ParameterContext existingContext = contextManager.getParameterContextNameMapping().get(versionedParameterContext.getName());
if (existingContext == null) {
addParameterContext(versionedParameterContext, flowManager);
addParameterContext(versionedParameterContext, flowManager, namedParameterContexts);
} else {
updateParameterContext(versionedParameterContext, existingContext, flowManager);
updateParameterContext(versionedParameterContext, existingContext, flowManager, namedParameterContexts);
}
}
private void addParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager) {
private void addParameterContext(
final VersionedParameterContext versionedParameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
flowManager.createParameterContext(versionedParameterContext.getInstanceIdentifier(), versionedParameterContext.getName(), parameters, referenceIds);
logger.info("Added Parameter Context {}", versionedParameterContext.getName());
}
private List<String> findReferencedParameterContextIds(final VersionedParameterContext versionedParameterContext, final ParameterContextManager contextManager) {
private List<String> findReferencedParameterContextIds(
final VersionedParameterContext versionedParameterContext,
final ParameterContextManager contextManager,
final Map<String, VersionedParameterContext> namedParameterContexts
) {
final List<String> referenceIds = new ArrayList<>();
final Map<String, ParameterContext> parameterContextsByName = contextManager.getParameterContextNameMapping();
if (versionedParameterContext.getInheritedParameterContexts() != null) {
for (final String inheritedContextName : versionedParameterContext.getInheritedParameterContexts()) {
final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName);
if (existingContext == null) {
logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}",
versionedParameterContext.getName(), inheritedContextName, inheritedContextName);
// Lookup inherited Parameter Context Name in Versioned Data Flow
final VersionedParameterContext inheritedParameterContext = namedParameterContexts.get(inheritedContextName);
if (inheritedParameterContext == null) {
// Lookup inherited Parameter Context Name in Parameter Context Manager
final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName);
if (existingContext == null) {
logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}",
versionedParameterContext.getName(), inheritedContextName, inheritedContextName);
} else {
referenceIds.add(existingContext.getIdentifier());
}
} else {
referenceIds.add(existingContext.getIdentifier());
referenceIds.add(inheritedParameterContext.getInstanceIdentifier());
}
}
}
@ -611,7 +633,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
return parameters;
}
private void updateParameterContext(final VersionedParameterContext versionedParameterContext, final ParameterContext parameterContext, final FlowManager flowManager) {
private void updateParameterContext(
final VersionedParameterContext versionedParameterContext,
final ParameterContext parameterContext,
final FlowManager flowManager,
final Map<String, VersionedParameterContext> namedParameterContexts
) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final Map<String, String> currentValues = new HashMap<>();
@ -652,7 +679,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
}
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
final List<ParameterContext> referencedContexts = inheritedContextIds.stream()
.map(contextManager::getParameterContext)
.collect(Collectors.toList());