From afdf49b3cd62fa3e363babf029be4debee269d03 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Wed, 1 Feb 2023 12:14:38 -0500 Subject: [PATCH] NIFI-11100: Limiting scope of external controller services during version sync to services outside the versioning context (#6898) --- ...tandardVersionedComponentSynchronizer.java | 71 +++++++++++-------- .../groups/FlowSynchronizationOptions.java | 19 ++++- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index f469658c7f..cf9d6c5052 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -256,7 +256,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen try { final Map parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null ? new HashMap<>() : versionedExternalFlow.getParameterProviders(); - synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences); + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group; + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } @@ -266,7 +267,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, - final Map parameterProviderReferences) + final Map parameterProviderReferences, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we @@ -378,7 +379,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Map controllerServicesByVersionedId = componentsById(group, grp -> grp.getControllerServices(false), ControllerServiceNode::getIdentifier, ControllerServiceNode::getVersionedComponentId); removeMissingControllerServices(group, proposed, controllerServicesByVersionedId); - synchronizeControllerServices(group, proposed, controllerServicesByVersionedId); + synchronizeControllerServices(group, proposed, controllerServicesByVersionedId, topLevelGroup); // Remove any connections that are not in the Proposed Process Group // Connections must be the first thing to remove, not the last. Otherwise, we will fail @@ -412,13 +413,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen removeMissingChildGroups(group, proposed, childGroupsByVersionedId); // Synchronize Child Process Groups - synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences); + synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup); synchronizeFunnels(group, proposed, funnelsByVersionedId); synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId); synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId); synchronizeLabels(group, proposed, labelsByVersionedId); - synchronizeProcessors(group, proposed, processorsByVersionedId); + synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup); synchronizeRemoteGroups(group, proposed, rpgsByVersionedId); } finally { // Make sure that we reset the connections @@ -504,7 +505,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, final Map childGroupsByVersionedId, - final Map parameterProviderReferences) throws ProcessorInstantiationException { + final Map parameterProviderReferences, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); @@ -524,7 +525,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables, - childParameterContexts, parameterProviderReferences); + childParameterContexts, parameterProviderReferences, topLevelGroup); context.getFlowManager().onProcessGroupAdded(added); added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize); LOG.info("Added {} to {}", added, group); @@ -538,14 +539,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen .build(); sync.setSynchronizationOptions(options); - sync.synchronize(childGroup, proposedChildGroup, childParameterContexts, parameterProviderReferences); + sync.synchronize(childGroup, proposedChildGroup, childParameterContexts, parameterProviderReferences, topLevelGroup); LOG.info("Updated {}", childGroup); } } } - private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId) { + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, + final ProcessGroup topLevelGroup) { // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each @@ -558,7 +560,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen for (final VersionedControllerService proposedService : proposed.getControllerServices()) { ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); if (service == null) { - service = addControllerService(group, proposedService, context.getComponentIdGenerator()); + service = addControllerService(group, proposedService, context.getComponentIdGenerator(), topLevelGroup); LOG.info("Added {} to {}", service, group); servicesAdded.put(proposedService.getIdentifier(), service); @@ -576,7 +578,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen continue; } - updateControllerService(addedService, proposedService); + updateControllerService(addedService, proposedService, topLevelGroup); } // Update all of the Controller Services to match the VersionedControllerService @@ -585,7 +587,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final VersionedControllerService proposedService = entry.getValue(); if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { - updateControllerService(service, proposedService); + updateControllerService(service, proposedService, topLevelGroup); LOG.info("Updated {}", service); } } @@ -920,16 +922,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } - private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId) + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); if (processor == null) { - final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator()); + final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); LOG.info("Added {} to {}", added, group); } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { - updateProcessor(processor, proposedProcessor); + updateProcessor(processor, proposedProcessor, topLevelGroup); LOG.info("Updated {}", processor); } else { processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); @@ -1099,7 +1102,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Set variablesToSkip, final Map versionedParameterContexts, - final Map parameterProviderReferences) throws ProcessorInstantiationException { + final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); final ProcessGroup group = context.getFlowManager().createProcessGroup(id); group.setVersionedComponentId(proposed.getIdentifier()); @@ -1116,12 +1119,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen .updateGroupSettings(true) .build(); sync.setSynchronizationOptions(options); - sync.synchronize(group, proposed, versionedParameterContexts, parameterProviderReferences); + sync.synchronize(group, proposed, versionedParameterContexts, parameterProviderReferences, topLevelGroup); return group; } - private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator) { + private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator, + final ProcessGroup topLevelGroup) { final String destinationId = destination == null ? "Controller" : destination.getIdentifier(); final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destinationId); LOG.debug("Adding Controller Service with ID {} of type {}", identifier, proposed.getType()); @@ -1137,7 +1141,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen destination.addControllerService(newService); } - updateControllerService(newService, proposed); + updateControllerService(newService, proposed, topLevelGroup); return newService; } @@ -1182,11 +1186,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen verifyCanSynchronize(controllerService, proposed); try { + final ProcessGroup topLevelGroup = synchronizationOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(synchronizationOptions.getTopLevelGroupId()) : group; + if (proposed == null) { serviceProvider.removeControllerService(controllerService); LOG.info("Successfully synchronized {} by removing it from the flow", controllerService); } else if (controllerService == null) { - final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator()); + final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator(), topLevelGroup); if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) { servicesToRestart.add(added); @@ -1194,7 +1200,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen LOG.info("Successfully synchronized {} by adding it to the flow", added); } else { - updateControllerService(controllerService, proposed); + updateControllerService(controllerService, proposed, topLevelGroup); if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) { servicesToRestart.add(controllerService); @@ -1250,7 +1256,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } - private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) { + private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed, final ProcessGroup topLevelGroup) { LOG.debug("Updating {}", service); service.pauseValidationTrigger(); @@ -1275,7 +1281,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values()); - final Map properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup()); + final Map properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup(), topLevelGroup); service.setProperties(properties, true, sensitiveDynamicPropertyNames); } finally { @@ -1313,7 +1319,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } private Map populatePropertiesMap(final ComponentNode componentNode, final Map proposedProperties, - final Map proposedPropertyDescriptors, final ProcessGroup group) { + final Map proposedPropertyDescriptors, + final ProcessGroup group, final ProcessGroup topLevelGroup) { // Explicitly set all existing properties to null, except for sensitive properties, so that if there isn't an entry in the proposedProperties // it will get removed from the processor. We don't do this for sensitive properties because when we retrieve the VersionedProcessGroup from registry, @@ -1348,7 +1355,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen String existingExternalServiceId = null; final String componentDescriptorValue = componentNode.getEffectivePropertyValue(descriptor); if (componentDescriptorValue != null) { - final ProcessGroup parentGroup = group.getParent(); + final ProcessGroup parentGroup = topLevelGroup.getParent(); if (parentGroup != null) { final ControllerServiceNode serviceNode = parentGroup.findControllerService(componentDescriptorValue, false, true); if (serviceNode != null) { @@ -2393,7 +2400,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } - private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final ComponentIdGenerator componentIdGenerator) throws ProcessorInstantiationException { + private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final ComponentIdGenerator componentIdGenerator, + final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); LOG.debug("Adding Processor with ID {} of type {}", identifier, proposed.getType()); @@ -2402,7 +2410,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen procNode.setVersionedComponentId(proposed.getIdentifier()); destination.addProcessor(procNode); - updateProcessor(procNode, proposed); + updateProcessor(procNode, proposed, topLevelGroup); // Notify the processor node that the configuration (properties, e.g.) has been restored final ProcessContext processContext = context.getProcessContextFactory().apply(procNode); @@ -2485,6 +2493,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen verifyCanSynchronize(processor, proposedProcessor, timeout); try { + final ProcessGroup topLevelGroup = synchronizationOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(synchronizationOptions.getTopLevelGroupId()) : group; if (proposedProcessor == null) { final Set stoppedDownstream = stopDownstreamComponents(processor, timeout, synchronizationOptions); toRestart.addAll(stoppedDownstream); @@ -2492,10 +2501,10 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen processor.getProcessGroup().removeProcessor(processor); LOG.info("Successfully synchronized {} by removing it from the flow", processor); } else if (processor == null) { - final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator()); + final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator(), topLevelGroup); LOG.info("Successfully synchronized {} by adding it to the flow", added); } else { - updateProcessor(processor, proposedProcessor); + updateProcessor(processor, proposedProcessor, topLevelGroup); LOG.info("Successfully synchronized {} by updating it to match proposed version", processor); } } catch (final Exception e) { @@ -2723,7 +2732,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } - private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException { + private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { LOG.debug("Updating Processor {}", processor); processor.pauseValidationTrigger(); @@ -2742,7 +2751,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(processor, proposed.getProperties(), proposed.getPropertyDescriptors().values()); - final Map properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup()); + final Map properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup(), topLevelGroup); processor.setProperties(properties, true, sensitiveDynamicPropertyNames); processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS); processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java index a1ca499ae4..fad983aeba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java @@ -36,6 +36,7 @@ public class FlowSynchronizationOptions { private final Duration componentStopTimeout; private final ComponentStopTimeoutAction timeoutAction; private final ScheduledStateChangeListener scheduledStateChangeListener; + private final String topLevelGroupId; private FlowSynchronizationOptions(final Builder builder) { this.componentIdGenerator = builder.componentIdGenerator; @@ -51,6 +52,7 @@ public class FlowSynchronizationOptions { this.componentStopTimeout = builder.componentStopTimeout; this.timeoutAction = builder.timeoutAction; this.scheduledStateChangeListener = builder.scheduledStateChangeListener; + this.topLevelGroupId = builder.topLevelGroupId; } public ComponentIdGenerator getComponentIdGenerator() { @@ -105,6 +107,10 @@ public class FlowSynchronizationOptions { return scheduledStateChangeListener; } + public String getTopLevelGroupId() { + return topLevelGroupId; + } + public static class Builder { private ComponentIdGenerator componentIdGenerator; private Function componentComparisonIdLookup; @@ -119,7 +125,7 @@ public class FlowSynchronizationOptions { private PropertyDecryptor propertyDecryptor = value -> value; private Duration componentStopTimeout = Duration.ofSeconds(30); private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION; - + private String topLevelGroupId; /** * Specifies the Component ID Generator to use for generating UUID's of components that are to be added to a ProcessGroup @@ -281,6 +287,16 @@ public class FlowSynchronizationOptions { return new FlowSynchronizationOptions(this); } + /** + * Specifies the identifier of the top level group that scopes the synchronization. + * @param topLevelGroupId the top level group id + * @return the builder + */ + public Builder topLevelGroupId(final String topLevelGroupId) { + this.topLevelGroupId = topLevelGroupId; + return this; + } + public static Builder from(final FlowSynchronizationOptions options) { final Builder builder = new Builder(); builder.componentIdGenerator = options.getComponentIdGenerator(); @@ -296,6 +312,7 @@ public class FlowSynchronizationOptions { builder.componentStopTimeout = options.getComponentStopTimeout(); builder.timeoutAction = options.getComponentStopTimeoutAction(); builder.scheduledStateChangeListener = options.getScheduledStateChangeListener(); + builder.topLevelGroupId = options.getTopLevelGroupId(); return builder; }