diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java index eaa89664b9..72c5bd3043 100644 --- a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnConfigurationRestored.java @@ -17,6 +17,9 @@ package org.apache.nifi.annotation.lifecycle; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.ProcessContext; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; @@ -32,7 +35,11 @@ import java.lang.annotation.Target; *

* *

- * Methods with this annotation must take zero arguments. + * Methods with this annotation are permitted to take no arguments or to take a + * single argument. If using a single argument, that argument must be of type + * {@link ConfigurationContext} if the component is a ReportingTask or a + * ControllerService. If the component is a Processor, then the argument must be + * of type {@link ProcessContext}. *

* *

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index d489177e79..7de3bbe6d7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -135,8 +135,6 @@ public class StandardProcessorTestRunner implements TestRunner { } triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class); - - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor); } @Override @@ -195,6 +193,10 @@ public class StandardProcessorTestRunner implements TestRunner { context.assertValid(); context.enableExpressionValidation(); + + // Call onConfigurationRestored here, right before the test run, as all properties should have been set byt this point. + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, this.context); + try { if (initialize) { try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 875cbbde0b..a23fd0e4b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.DeprecationNotice; +import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -1912,4 +1913,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } } + + @Override + public void onConfigurationRestored(final ProcessContext context) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getProcessor().getClass(), getProcessor().getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, getProcessor(), context); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 880c6d269a..7b32364147 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -4989,6 +4989,10 @@ public final class StandardProcessGroup implements ProcessGroup { destination.addProcessor(procNode); updateProcessor(procNode, proposed); + // Notify the processor node that the configuration (properties, e.g.) has been restored + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor, + stateManagerProvider.getStateManager(procNode.getProcessor().getIdentifier()), () -> false, nodeTypeProvider); + procNode.onConfigurationRestored(processContext); return procNode; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 7c5165feb6..f4977cc297 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -330,17 +331,6 @@ public abstract class AbstractComponentNode implements ComponentNode { } } } - } else { - final ParameterContext parameterContext = getParameterContext(); - if (parameterContext != null) { - for (final ParameterReference reference : referenceList) { - final Optional parameter = parameterContext.getParameter(reference.getParameterName()); - if (parameter.isPresent() && parameter.get().getDescriptor().isSensitive()) { - throw new IllegalArgumentException("The property '" + descriptor.getDisplayName() + "' cannot reference Parameter '" + parameter.get().getDescriptor().getName() - + "' because Sensitive Parameters may only be referenced by Sensitive Properties."); - } - } - } } if (descriptor.getControllerServiceDefinition() != null) { @@ -550,15 +540,15 @@ public abstract class AbstractComponentNode implements ComponentNode { @Override public Map getRawPropertyValues() { - return getPropertyValues(PropertyConfiguration::getRawValue); + return getPropertyValues((descriptor, config) -> config.getRawValue()); } @Override public Map getEffectivePropertyValues() { - return getPropertyValues(config -> config.getEffectiveValue(getParameterContext())); + return getPropertyValues((descriptor, config) -> getConfigValue(config, isResolveParameter(descriptor, config))); } - private Map getPropertyValues(final Function valueFunction) { + private Map getPropertyValues(final BiFunction valueFunction) { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getIdentifier())) { final List supported = getComponent().getPropertyDescriptors(); @@ -569,7 +559,7 @@ public abstract class AbstractComponentNode implements ComponentNode { } } - properties.forEach((descriptor, config) -> props.put(descriptor, valueFunction.apply(config))); + properties.forEach((descriptor, config) -> props.put(descriptor, valueFunction.apply(descriptor, config))); return props; } } @@ -777,10 +767,22 @@ public abstract class AbstractComponentNode implements ComponentNode { for (final String paramName : referencedParameters) { if (!validationContext.isParameterDefined(paramName)) { results.add(new ValidationResult.Builder() - .subject(propertyDescriptor.getDisplayName()) - .valid(false) - .explanation("Property references Parameter '" + paramName + "' but the currently selected Parameter Context does not have a Parameter with that name") - .build()); + .subject(propertyDescriptor.getDisplayName()) + .valid(false) + .explanation("Property references Parameter '" + paramName + "' but the currently selected Parameter Context does not have a Parameter with that name") + .build()); + } + final Optional parameterRef = parameterContext.getParameter(paramName); + if (parameterRef.isPresent()) { + final ParameterDescriptor parameterDescriptor = parameterRef.get().getDescriptor(); + if (parameterDescriptor.isSensitive() != propertyDescriptor.isSensitive()) { + results.add(new ValidationResult.Builder() + .subject(propertyDescriptor.getDisplayName()) + .valid(false) + .explanation("The property '" + propertyDescriptor.getDisplayName() + "' cannot reference Parameter '" + parameterDescriptor.getName() + + "' because the Sensitivity of the parameter does not match the Sensitivity of the property.") + .build()); + } } } } @@ -1243,6 +1245,40 @@ public abstract class AbstractComponentNode implements ComponentNode { this.additionalResourcesFingerprint = additionalResourcesFingerprint; } + // Determine whether the property value should be evaluated in terms of the parameter context or not. + // If the sensitivity of the property does not match the sensitivity of the parameter, the literal value will be returned + // + // Examples when SensitiveParam value = 'abc' and MY_PROP is non-sensitive: + // SensitiveProp --> 'abc' + // NonSensitiveProp --> '#{SensitiveParam}' + // context.getProperty(MY_PROP).getValue(); '#{SensitiveParam}' + private boolean isResolveParameter(final PropertyDescriptor descriptor, final PropertyConfiguration config) { + boolean okToResolve = true; + + final ParameterContext context = getParameterContext(); + if (context == null) { + return false; + } + for (final ParameterReference reference : config.getParameterReferences()) { + final String parameterName = reference.getParameterName(); + final Optional optionalParameter = context.getParameter(parameterName); + if (optionalParameter.isPresent()) { + final boolean paramIsSensitive = optionalParameter.get().getDescriptor().isSensitive(); + if (paramIsSensitive != descriptor.isSensitive()) { + okToResolve = false; + break; + } + } + } + return okToResolve; + } + + // Evaluates the parameter value if it is ok to do so, otherwise return the raw "${param}" literal. + // This is done to prevent evaluation of a sensitive parameter when setting a non-sensitive property. + private String getConfigValue(final PropertyConfiguration config, final boolean okToResolve) { + return okToResolve ? config.getEffectiveValue(getParameterContext()) : config.getRawValue(); + } + protected abstract ParameterContext getParameterContext(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 053e97ddf0..61c66225a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -290,4 +290,11 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * @return the desired state for this Processor */ public abstract ScheduledState getDesiredState(); + + /** + * This method will be called once the processor's configuration has been restored (on startup, reload, e.g.) + * + * @param context The ProcessContext associated with the Processor configuration + */ + public abstract void onConfigurationRestored(ProcessContext context); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b409103013..cbf9cbec25 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -113,6 +113,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.serialization.ScheduledStateLookup; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; @@ -152,6 +153,7 @@ import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.parameter.StandardParameterContextManager; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.provenance.ComponentIdentifierLookup; import org.apache.nifi.provenance.IdentifierLookup; import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; @@ -985,7 +987,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) { final Processor processor = procNode.getProcessor(); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor); + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor, + getStateManagerProvider().getStateManager(processor.getIdentifier()), () -> false, this); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, processContext); } } @@ -993,7 +997,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node final ControllerService service = serviceNode.getControllerServiceImplementation(); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service); + final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext); } } @@ -1001,7 +1006,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node final ReportingTask task = taskNode.getReportingTask(); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, task.getClass(), task.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task, taskNode.getConfigurationContext()); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java index 54c0b02b49..52cecd8be5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSnippet.java @@ -38,10 +38,11 @@ public interface FlowSnippet { * Instantiates this snippet, adding it to the given Process Group * * @param flowManager the FlowManager + * @param flowController the FlowController * @param group the group to add the snippet to * @throws ProcessorInstantiationException if unable to instantiate any of the Processors within the snippet * @throws org.apache.nifi.controller.exception.ControllerServiceInstantiationException if unable to instantiate any of the Controller Services within the snippet */ - void instantiate(FlowManager flowManager, ProcessGroup group) throws ProcessorInstantiationException; + void instantiate(FlowManager flowManager, FlowController flowController, ProcessGroup group) throws ProcessorInstantiationException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java index f98975cd2b..8ba67dc4a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java @@ -42,6 +42,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.PublicPort; @@ -151,8 +152,8 @@ public class StandardFlowSnippet implements FlowSnippet { } } - public void instantiate(final FlowManager flowManager, final ProcessGroup group) throws ProcessorInstantiationException { - instantiate(flowManager, group, true); + public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group) throws ProcessorInstantiationException { + instantiate(flowManager, flowController, group, true); } @@ -221,7 +222,7 @@ public class StandardFlowSnippet implements FlowSnippet { } - public void instantiate(final FlowManager flowManager, final ProcessGroup group, final boolean topLevel) { + public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group, final boolean topLevel) { // // Instantiate Controller Services // @@ -406,6 +407,11 @@ public class StandardFlowSnippet implements FlowSnippet { procNode.setProperties(config.getProperties()); } + // Notify the processor node that the configuration (properties, e.g.) has been restored + final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(), + flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController); + procNode.onConfigurationRestored(processContext); + group.addProcessor(procNode); } finally { procNode.resumeValidationTrigger(); @@ -526,7 +532,7 @@ public class StandardFlowSnippet implements FlowSnippet { childTemplateDTO.setControllerServices(contents.getControllerServices()); final StandardFlowSnippet childSnippet = new StandardFlowSnippet(childTemplateDTO, extensionManager); - childSnippet.instantiate(flowManager, childGroup, false); + childSnippet.instantiate(flowManager, flowController, childGroup, false); if (groupDTO.getVersionControlInformation() != null) { final VersionControlInformation vci = StandardVersionControlInformation.Builder diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java index 74d5d4f8d4..35bdae1dc1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java @@ -97,6 +97,11 @@ public class StandardReloadComponent implements ReloadComponent { // need to refresh the properties in case we are changing from ghost component to real component existingNode.refreshProperties(); + // Notify the processor node that the configuration (properties, e.g.) has been restored + final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(), + flowController.getStateManagerProvider().getStateManager(existingNode.getProcessor().getIdentifier()), () -> false, flowController); + existingNode.onConfigurationRestored(processContext); + logger.debug("Triggering async validation of {} due to processor reload", existingNode); flowController.getValidationTrigger().trigger(existingNode); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index 01b4871369..6cf1ccf2e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -271,7 +271,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana final FlowSnippet snippet = new StandardFlowSnippet(dto, flowController.getExtensionManager()); snippet.validate(group); - snippet.instantiate(this, group); + snippet.instantiate(this, flowController, group); group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize); } @@ -342,12 +342,6 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana } throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); } - - if (flowController.isInitialized()) { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor()); - } - } } return procNode; @@ -393,7 +387,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask()); if (flowController.isInitialized()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); } } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e); @@ -497,7 +491,9 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana if (flowController.isInitialized()) { try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service); + final ConfigurationContext configurationContext = + new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, flowController.getVariableRegistry()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index d4bc543bb8..3ab11a9bdc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -125,6 +125,11 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { // configure the processor configureProcessor(processor, processorDTO); + // Notify the processor node that the configuration (properties, e.g.) has been restored + final StandardProcessContext processContext = new StandardProcessContext(processor, flowController.getControllerServiceProvider(), flowController.getEncryptor(), + flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()), () -> false, flowController); + processor.onConfigurationRestored(processContext); + return processor; } catch (IllegalStateException | ComponentLifeCycleException ise) { throw new NiFiCoreException(ise.getMessage(), ise); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 48f12ce9d2..65146dff24 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnAdded; +import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -73,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference; + "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. The scripted processor can " + "implement public void setLogger(ComponentLog logger) to get access to the parent logger, as well as public void onScheduled(ProcessContext context) and " + "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively. " + + "NOTE: The script will be loaded when the processor is populated with property values, see the Restrictions section for more security implications. " + "Experimental: Impact of sustained usage not yet verified.") @DynamicProperty(name = "A script engine property to update", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, @@ -212,6 +214,12 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { invokeScriptedProcessorMethod("onScheduled", context); } + @OnConfigurationRestored + public void onConfigurationRestored(final ProcessContext context) { + scriptingComponentHelper.setupVariables(context); + setup(); + } + public void setup() { if (scriptNeedsReload.get() || processor.get() == null) { if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { @@ -242,8 +250,24 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) || ScriptingComponentUtils.MODULES.equals(descriptor) || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + + // Update the ScriptingComponentHelper's value(s) + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)) { + scriptingComponentHelper.setScriptPath(newValue); + } else if (ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)) { + scriptingComponentHelper.setScriptBody(newValue); + } else if (ScriptingComponentUtils.MODULES.equals(descriptor)) { + scriptingComponentHelper.setScriptBody(newValue); + } else if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptingComponentHelper.setScriptEngineName(newValue); + } + scriptNeedsReload.set(true); scriptRunner = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time. + if (isConfigurationRestored()) { + // Once the configuration has been restored, each call to onPropertyModified() is due to a change made after the processor was loaded, so reload the script + setup(); + } } else if (instance != null) { // If the script provides a Processor, call its onPropertyModified() method try { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java index 8c8a391183..5201fba297 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java @@ -17,6 +17,7 @@ package org.apache.nifi.script; import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -24,6 +25,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.script.ScriptRunner; @@ -116,6 +118,12 @@ public abstract class AbstractScriptedControllerService extends AbstractControll } } + @OnConfigurationRestored + public void onConfigurationRestored(final ProcessContext context) { + scriptingComponentHelper.setupVariables(context); + setup(); + } + @Override protected Collection customValidate(ValidationContext validationContext) { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java index a3e948f38d..bb96578436 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java @@ -18,7 +18,9 @@ package org.apache.nifi.processors.script; import org.apache.commons.codec.binary.Hex; +import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.script.ScriptingComponentUtils; import org.apache.nifi.serialization.record.MockRecordParser; @@ -140,14 +142,14 @@ public class TestInvokeGroovy extends BaseScriptTest { */ @Test public void testInvokeScriptCausesException() { - final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor()); + final TestRunner runner = TestRunners.newTestRunner(new OverrideInvokeScriptedProcessor()); runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy"); runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString( TEST_RESOURCE_LOCATION + "groovy/testInvokeScriptCausesException.groovy") ); runner.assertValid(); runner.enqueue("test content".getBytes(StandardCharsets.UTF_8)); - assertThrows(AssertionError.class, () -> runner.run()); + assertThrows(AssertionError.class, runner::run); } /** @@ -198,7 +200,7 @@ public class TestInvokeGroovy extends BaseScriptTest { runner.assertAllFlowFilesTransferred("success", 1); final List result = runner.getFlowFilesForRelationship("success"); - assertTrue(result.size() == 1); + assertEquals(1, result.size()); final String expectedOutput = new String(Hex.encodeHex(MessageDigestUtils.getDigest("testbla bla".getBytes()))); final MockFlowFile outputFlowFile = result.get(0); outputFlowFile.assertContentEquals(expectedOutput); @@ -238,4 +240,22 @@ public class TestInvokeGroovy extends BaseScriptTest { MockFlowFile ff = result.get(0); ff.assertContentEquals("48\n47\n14\n"); } + + private static class OverrideInvokeScriptedProcessor extends InvokeScriptedProcessor { + + private int numTimesModifiedCalled = 0; + + @OnConfigurationRestored + @Override + public void onConfigurationRestored(ProcessContext context) { + super.onConfigurationRestored(context); + assertEquals(this.getSupportedPropertyDescriptors().size(), numTimesModifiedCalled); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + numTimesModifiedCalled++; + } + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java index bef3fb2fc3..0efbc5f046 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java @@ -67,7 +67,6 @@ public class TestInvokeJython extends BaseScriptTest { @Test public void testInvalidThenFixed() { final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor()); - runner.setValidateExpressionUsage(false); runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python"); runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py"); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java index 737f7373ba..45cd711c79 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java @@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -28,6 +29,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.StandardConnection; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; @@ -47,6 +49,7 @@ import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; @@ -61,6 +64,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.StandardRemoteProcessGroup; @@ -174,7 +178,10 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan } try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor()); + final StateManager stateManager = statelessEngine.getStateManagerProvider().getStateManager(id); + final StandardProcessContext processContext = new StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(), + statelessEngine.getPropertyEncryptor(), stateManager, () -> false, new StatelessNodeTypeProvider()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor(), processContext); } LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger()); @@ -292,7 +299,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask()); if (isFlowInitialized()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); } } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e); @@ -343,7 +350,9 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan final ExtensionManager extensionManager = statelessEngine.getExtensionManager(); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service); + final ConfigurationContext configurationContext = + new StandardConfigurationContext(serviceNode, statelessEngine.getControllerServiceProvider(), null, statelessEngine.getRootVariableRegistry()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext); } final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation(); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java index de4dfb4f28..238842f4c7 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java @@ -102,6 +102,11 @@ public class StatelessReloadComponent implements ReloadComponent { // need to refresh the properties in case we are changing from ghost component to real component existingNode.refreshProperties(); + // Notify the processor node that the configuration (properties, e.g.) has been restored + final StandardProcessContext processContext = new StandardProcessContext(existingNode, statelessEngine.getControllerServiceProvider(), + statelessEngine.getPropertyEncryptor(), statelessEngine.getStateManagerProvider().getStateManager(id), () -> false, new StatelessNodeTypeProvider()); + existingNode.onConfigurationRestored(processContext); + logger.debug("Successfully reloaded {}", existingNode); }