NIFI-11100: Limiting scope of external controller services during version sync to services outside the versioning context (#6898)

This commit is contained in:
Joe Gresock 2023-02-01 12:14:38 -05:00 committed by GitHub
parent 3f04fae5a5
commit afdf49b3cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 32 deletions

View File

@ -256,7 +256,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
try {
final Map<String, ParameterProviderReference> parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null
? new HashMap<>() : versionedExternalFlow.getParameterProviders();
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences);
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group;
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
}
@ -266,7 +267,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences)
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
@ -378,7 +379,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Map<String, ControllerServiceNode> controllerServicesByVersionedId = componentsById(group, grp -> grp.getControllerServices(false),
ControllerServiceNode::getIdentifier, ControllerServiceNode::getVersionedComponentId);
removeMissingControllerServices(group, proposed, controllerServicesByVersionedId);
synchronizeControllerServices(group, proposed, controllerServicesByVersionedId);
synchronizeControllerServices(group, proposed, controllerServicesByVersionedId, topLevelGroup);
// Remove any connections that are not in the Proposed Process Group
// Connections must be the first thing to remove, not the last. Otherwise, we will fail
@ -412,13 +413,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
// Synchronize Child Process Groups
synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences);
synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup);
synchronizeFunnels(group, proposed, funnelsByVersionedId);
synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
synchronizeLabels(group, proposed, labelsByVersionedId);
synchronizeProcessors(group, proposed, processorsByVersionedId);
synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup);
synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
} finally {
// Make sure that we reset the connections
@ -504,7 +505,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ProcessGroup> childGroupsByVersionedId,
final Map<String, ParameterProviderReference> parameterProviderReferences) throws ProcessorInstantiationException {
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
@ -524,7 +525,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables,
childParameterContexts, parameterProviderReferences);
childParameterContexts, parameterProviderReferences, topLevelGroup);
context.getFlowManager().onProcessGroupAdded(added);
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, group);
@ -538,14 +539,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
.build();
sync.setSynchronizationOptions(options);
sync.synchronize(childGroup, proposedChildGroup, childParameterContexts, parameterProviderReferences);
sync.synchronize(childGroup, proposedChildGroup, childParameterContexts, parameterProviderReferences, topLevelGroup);
LOG.info("Updated {}", childGroup);
}
}
}
private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId) {
private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId,
final ProcessGroup topLevelGroup) {
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
@ -558,7 +560,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
if (service == null) {
service = addControllerService(group, proposedService, context.getComponentIdGenerator());
service = addControllerService(group, proposedService, context.getComponentIdGenerator(), topLevelGroup);
LOG.info("Added {} to {}", service, group);
servicesAdded.put(proposedService.getIdentifier(), service);
@ -576,7 +578,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
continue;
}
updateControllerService(addedService, proposedService);
updateControllerService(addedService, proposedService, topLevelGroup);
}
// Update all of the Controller Services to match the VersionedControllerService
@ -585,7 +587,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final VersionedControllerService proposedService = entry.getValue();
if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
updateControllerService(service, proposedService);
updateControllerService(service, proposedService, topLevelGroup);
LOG.info("Updated {}", service);
}
}
@ -920,16 +922,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
}
private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId)
private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId,
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator());
final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
LOG.info("Added {} to {}", added, group);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor);
updateProcessor(processor, proposedProcessor, topLevelGroup);
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@ -1099,7 +1102,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Set<String> variablesToSkip,
final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences) throws ProcessorInstantiationException {
final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
final ProcessGroup group = context.getFlowManager().createProcessGroup(id);
group.setVersionedComponentId(proposed.getIdentifier());
@ -1116,12 +1119,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
.updateGroupSettings(true)
.build();
sync.setSynchronizationOptions(options);
sync.synchronize(group, proposed, versionedParameterContexts, parameterProviderReferences);
sync.synchronize(group, proposed, versionedParameterContexts, parameterProviderReferences, topLevelGroup);
return group;
}
private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator) {
private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final ComponentIdGenerator componentIdGenerator,
final ProcessGroup topLevelGroup) {
final String destinationId = destination == null ? "Controller" : destination.getIdentifier();
final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destinationId);
LOG.debug("Adding Controller Service with ID {} of type {}", identifier, proposed.getType());
@ -1137,7 +1141,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
destination.addControllerService(newService);
}
updateControllerService(newService, proposed);
updateControllerService(newService, proposed, topLevelGroup);
return newService;
}
@ -1182,11 +1186,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
verifyCanSynchronize(controllerService, proposed);
try {
final ProcessGroup topLevelGroup = synchronizationOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(synchronizationOptions.getTopLevelGroupId()) : group;
if (proposed == null) {
serviceProvider.removeControllerService(controllerService);
LOG.info("Successfully synchronized {} by removing it from the flow", controllerService);
} else if (controllerService == null) {
final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator());
final ControllerServiceNode added = addControllerService(group, proposed, synchronizationOptions.getComponentIdGenerator(), topLevelGroup);
if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
servicesToRestart.add(added);
@ -1194,7 +1200,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
LOG.info("Successfully synchronized {} by adding it to the flow", added);
} else {
updateControllerService(controllerService, proposed);
updateControllerService(controllerService, proposed, topLevelGroup);
if (proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.ENABLED) {
servicesToRestart.add(controllerService);
@ -1250,7 +1256,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
}
private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed, final ProcessGroup topLevelGroup) {
LOG.debug("Updating {}", service);
service.pauseValidationTrigger();
@ -1275,7 +1281,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values());
final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup(), topLevelGroup);
service.setProperties(properties, true, sensitiveDynamicPropertyNames);
} finally {
@ -1313,7 +1319,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
private Map<String, String> populatePropertiesMap(final ComponentNode componentNode, final Map<String, String> proposedProperties,
final Map<String, VersionedPropertyDescriptor> proposedPropertyDescriptors, final ProcessGroup group) {
final Map<String, VersionedPropertyDescriptor> proposedPropertyDescriptors,
final ProcessGroup group, final ProcessGroup topLevelGroup) {
// Explicitly set all existing properties to null, except for sensitive properties, so that if there isn't an entry in the proposedProperties
// it will get removed from the processor. We don't do this for sensitive properties because when we retrieve the VersionedProcessGroup from registry,
@ -1348,7 +1355,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
String existingExternalServiceId = null;
final String componentDescriptorValue = componentNode.getEffectivePropertyValue(descriptor);
if (componentDescriptorValue != null) {
final ProcessGroup parentGroup = group.getParent();
final ProcessGroup parentGroup = topLevelGroup.getParent();
if (parentGroup != null) {
final ControllerServiceNode serviceNode = parentGroup.findControllerService(componentDescriptorValue, false, true);
if (serviceNode != null) {
@ -2393,7 +2400,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
}
private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final ComponentIdGenerator componentIdGenerator) throws ProcessorInstantiationException {
private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final ComponentIdGenerator componentIdGenerator,
final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
final String identifier = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
LOG.debug("Adding Processor with ID {} of type {}", identifier, proposed.getType());
@ -2402,7 +2410,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
procNode.setVersionedComponentId(proposed.getIdentifier());
destination.addProcessor(procNode);
updateProcessor(procNode, proposed);
updateProcessor(procNode, proposed, topLevelGroup);
// Notify the processor node that the configuration (properties, e.g.) has been restored
final ProcessContext processContext = context.getProcessContextFactory().apply(procNode);
@ -2485,6 +2493,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
verifyCanSynchronize(processor, proposedProcessor, timeout);
try {
final ProcessGroup topLevelGroup = synchronizationOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(synchronizationOptions.getTopLevelGroupId()) : group;
if (proposedProcessor == null) {
final Set<Connectable> stoppedDownstream = stopDownstreamComponents(processor, timeout, synchronizationOptions);
toRestart.addAll(stoppedDownstream);
@ -2492,10 +2501,10 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
processor.getProcessGroup().removeProcessor(processor);
LOG.info("Successfully synchronized {} by removing it from the flow", processor);
} else if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator());
final ProcessorNode added = addProcessor(group, proposedProcessor, synchronizationOptions.getComponentIdGenerator(), topLevelGroup);
LOG.info("Successfully synchronized {} by adding it to the flow", added);
} else {
updateProcessor(processor, proposedProcessor);
updateProcessor(processor, proposedProcessor, topLevelGroup);
LOG.info("Successfully synchronized {} by updating it to match proposed version", processor);
}
} catch (final Exception e) {
@ -2723,7 +2732,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
LOG.debug("Updating Processor {}", processor);
processor.pauseValidationTrigger();
@ -2742,7 +2751,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(processor, proposed.getProperties(), proposed.getPropertyDescriptors().values());
final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup(), topLevelGroup);
processor.setProperties(properties, true, sensitiveDynamicPropertyNames);
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));

View File

@ -36,6 +36,7 @@ public class FlowSynchronizationOptions {
private final Duration componentStopTimeout;
private final ComponentStopTimeoutAction timeoutAction;
private final ScheduledStateChangeListener scheduledStateChangeListener;
private final String topLevelGroupId;
private FlowSynchronizationOptions(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator;
@ -51,6 +52,7 @@ public class FlowSynchronizationOptions {
this.componentStopTimeout = builder.componentStopTimeout;
this.timeoutAction = builder.timeoutAction;
this.scheduledStateChangeListener = builder.scheduledStateChangeListener;
this.topLevelGroupId = builder.topLevelGroupId;
}
public ComponentIdGenerator getComponentIdGenerator() {
@ -105,6 +107,10 @@ public class FlowSynchronizationOptions {
return scheduledStateChangeListener;
}
public String getTopLevelGroupId() {
return topLevelGroupId;
}
public static class Builder {
private ComponentIdGenerator componentIdGenerator;
private Function<VersionedComponent, String> componentComparisonIdLookup;
@ -119,7 +125,7 @@ public class FlowSynchronizationOptions {
private PropertyDecryptor propertyDecryptor = value -> value;
private Duration componentStopTimeout = Duration.ofSeconds(30);
private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION;
private String topLevelGroupId;
/**
* Specifies the Component ID Generator to use for generating UUID's of components that are to be added to a ProcessGroup
@ -281,6 +287,16 @@ public class FlowSynchronizationOptions {
return new FlowSynchronizationOptions(this);
}
/**
* Specifies the identifier of the top level group that scopes the synchronization.
* @param topLevelGroupId the top level group id
* @return the builder
*/
public Builder topLevelGroupId(final String topLevelGroupId) {
this.topLevelGroupId = topLevelGroupId;
return this;
}
public static Builder from(final FlowSynchronizationOptions options) {
final Builder builder = new Builder();
builder.componentIdGenerator = options.getComponentIdGenerator();
@ -296,6 +312,7 @@ public class FlowSynchronizationOptions {
builder.componentStopTimeout = options.getComponentStopTimeout();
builder.timeoutAction = options.getComponentStopTimeoutAction();
builder.scheduledStateChangeListener = options.getScheduledStateChangeListener();
builder.topLevelGroupId = options.getTopLevelGroupId();
return builder;
}