mirror of https://github.com/apache/nifi.git
NIFI-7012: Refactored OnConfigurationRestored to support sensitive property validation (#5415)
This commit is contained in:
parent
ca530f40d8
commit
104078868e
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.nifi.annotation.lifecycle;
|
||||
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
|
@ -32,7 +35,11 @@ import java.lang.annotation.Target;
|
|||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Methods with this annotation must take zero arguments.
|
||||
* Methods with this annotation are permitted to take no arguments or to take a
|
||||
* single argument. If using a single argument, that argument must be of type
|
||||
* {@link ConfigurationContext} if the component is a ReportingTask or a
|
||||
* ControllerService. If the component is a Processor, then the argument must be
|
||||
* of type {@link ProcessContext}.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
|
|
|
@ -135,8 +135,6 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
}
|
||||
|
||||
triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class);
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -195,6 +193,10 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
|
||||
context.assertValid();
|
||||
context.enableExpressionValidation();
|
||||
|
||||
// Call onConfigurationRestored here, right before the test run, as all properties should have been set byt this point.
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, this.context);
|
||||
|
||||
try {
|
||||
if (initialize) {
|
||||
try {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
|||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.DeprecationNotice;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
|
@ -1912,4 +1913,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationRestored(final ProcessContext context) {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getProcessor().getClass(), getProcessor().getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, getProcessor(), context);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4989,6 +4989,10 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
|
||||
destination.addProcessor(procNode);
|
||||
updateProcessor(procNode, proposed);
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor,
|
||||
stateManagerProvider.getStateManager(procNode.getProcessor().getIdentifier()), () -> false, nodeTypeProvider);
|
||||
procNode.onConfigurationRestored(processContext);
|
||||
|
||||
return procNode;
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -330,17 +331,6 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final ParameterContext parameterContext = getParameterContext();
|
||||
if (parameterContext != null) {
|
||||
for (final ParameterReference reference : referenceList) {
|
||||
final Optional<Parameter> parameter = parameterContext.getParameter(reference.getParameterName());
|
||||
if (parameter.isPresent() && parameter.get().getDescriptor().isSensitive()) {
|
||||
throw new IllegalArgumentException("The property '" + descriptor.getDisplayName() + "' cannot reference Parameter '" + parameter.get().getDescriptor().getName()
|
||||
+ "' because Sensitive Parameters may only be referenced by Sensitive Properties.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (descriptor.getControllerServiceDefinition() != null) {
|
||||
|
@ -550,15 +540,15 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
|
||||
@Override
|
||||
public Map<PropertyDescriptor, String> getRawPropertyValues() {
|
||||
return getPropertyValues(PropertyConfiguration::getRawValue);
|
||||
return getPropertyValues((descriptor, config) -> config.getRawValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<PropertyDescriptor, String> getEffectivePropertyValues() {
|
||||
return getPropertyValues(config -> config.getEffectiveValue(getParameterContext()));
|
||||
return getPropertyValues((descriptor, config) -> getConfigValue(config, isResolveParameter(descriptor, config)));
|
||||
}
|
||||
|
||||
private Map<PropertyDescriptor, String> getPropertyValues(final Function<PropertyConfiguration, String> valueFunction) {
|
||||
private Map<PropertyDescriptor, String> getPropertyValues(final BiFunction<PropertyDescriptor, PropertyConfiguration, String> valueFunction) {
|
||||
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getIdentifier())) {
|
||||
final List<PropertyDescriptor> supported = getComponent().getPropertyDescriptors();
|
||||
|
||||
|
@ -569,7 +559,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
}
|
||||
}
|
||||
|
||||
properties.forEach((descriptor, config) -> props.put(descriptor, valueFunction.apply(config)));
|
||||
properties.forEach((descriptor, config) -> props.put(descriptor, valueFunction.apply(descriptor, config)));
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
@ -782,6 +772,18 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
.explanation("Property references Parameter '" + paramName + "' but the currently selected Parameter Context does not have a Parameter with that name")
|
||||
.build());
|
||||
}
|
||||
final Optional<Parameter> parameterRef = parameterContext.getParameter(paramName);
|
||||
if (parameterRef.isPresent()) {
|
||||
final ParameterDescriptor parameterDescriptor = parameterRef.get().getDescriptor();
|
||||
if (parameterDescriptor.isSensitive() != propertyDescriptor.isSensitive()) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(propertyDescriptor.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("The property '" + propertyDescriptor.getDisplayName() + "' cannot reference Parameter '" + parameterDescriptor.getName()
|
||||
+ "' because the Sensitivity of the parameter does not match the Sensitivity of the property.")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1243,6 +1245,40 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
this.additionalResourcesFingerprint = additionalResourcesFingerprint;
|
||||
}
|
||||
|
||||
// Determine whether the property value should be evaluated in terms of the parameter context or not.
|
||||
// If the sensitivity of the property does not match the sensitivity of the parameter, the literal value will be returned
|
||||
//
|
||||
// Examples when SensitiveParam value = 'abc' and MY_PROP is non-sensitive:
|
||||
// SensitiveProp --> 'abc'
|
||||
// NonSensitiveProp --> '#{SensitiveParam}'
|
||||
// context.getProperty(MY_PROP).getValue(); '#{SensitiveParam}'
|
||||
private boolean isResolveParameter(final PropertyDescriptor descriptor, final PropertyConfiguration config) {
|
||||
boolean okToResolve = true;
|
||||
|
||||
final ParameterContext context = getParameterContext();
|
||||
if (context == null) {
|
||||
return false;
|
||||
}
|
||||
for (final ParameterReference reference : config.getParameterReferences()) {
|
||||
final String parameterName = reference.getParameterName();
|
||||
final Optional<Parameter> optionalParameter = context.getParameter(parameterName);
|
||||
if (optionalParameter.isPresent()) {
|
||||
final boolean paramIsSensitive = optionalParameter.get().getDescriptor().isSensitive();
|
||||
if (paramIsSensitive != descriptor.isSensitive()) {
|
||||
okToResolve = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return okToResolve;
|
||||
}
|
||||
|
||||
// Evaluates the parameter value if it is ok to do so, otherwise return the raw "${param}" literal.
|
||||
// This is done to prevent evaluation of a sensitive parameter when setting a non-sensitive property.
|
||||
private String getConfigValue(final PropertyConfiguration config, final boolean okToResolve) {
|
||||
return okToResolve ? config.getEffectiveValue(getParameterContext()) : config.getRawValue();
|
||||
}
|
||||
|
||||
protected abstract ParameterContext getParameterContext();
|
||||
|
||||
}
|
||||
|
|
|
@ -290,4 +290,11 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
|
|||
* @return the desired state for this Processor
|
||||
*/
|
||||
public abstract ScheduledState getDesiredState();
|
||||
|
||||
/**
|
||||
* This method will be called once the processor's configuration has been restored (on startup, reload, e.g.)
|
||||
*
|
||||
* @param context The ProcessContext associated with the Processor configuration
|
||||
*/
|
||||
public abstract void onConfigurationRestored(ProcessContext context);
|
||||
}
|
||||
|
|
|
@ -113,6 +113,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer;
|
|||
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
|
||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.controller.service.StandardConfigurationContext;
|
||||
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
|
||||
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
|
||||
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
|
||||
|
@ -152,6 +153,7 @@ import org.apache.nifi.parameter.ParameterLookup;
|
|||
import org.apache.nifi.parameter.StandardParameterContextManager;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.StandardProcessContext;
|
||||
import org.apache.nifi.provenance.ComponentIdentifierLookup;
|
||||
import org.apache.nifi.provenance.IdentifierLookup;
|
||||
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
|
||||
|
@ -985,7 +987,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
|
||||
final Processor processor = procNode.getProcessor();
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor,
|
||||
getStateManagerProvider().getStateManager(processor.getIdentifier()), () -> false, this);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor, processContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -993,7 +997,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
final ControllerService service = serviceNode.getControllerServiceImplementation();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1001,7 +1006,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
final ReportingTask task = taskNode.getReportingTask();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, task.getClass(), task.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task, taskNode.getConfigurationContext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,10 +38,11 @@ public interface FlowSnippet {
|
|||
* Instantiates this snippet, adding it to the given Process Group
|
||||
*
|
||||
* @param flowManager the FlowManager
|
||||
* @param flowController the FlowController
|
||||
* @param group the group to add the snippet to
|
||||
* @throws ProcessorInstantiationException if unable to instantiate any of the Processors within the snippet
|
||||
* @throws org.apache.nifi.controller.exception.ControllerServiceInstantiationException if unable to instantiate any of the Controller Services within the snippet
|
||||
*/
|
||||
void instantiate(FlowManager flowManager, ProcessGroup group) throws ProcessorInstantiationException;
|
||||
void instantiate(FlowManager flowManager, FlowController flowController, ProcessGroup group) throws ProcessorInstantiationException;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.nifi.nar.ExtensionManager;
|
|||
import org.apache.nifi.parameter.ParameterContext;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.StandardProcessContext;
|
||||
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
|
||||
import org.apache.nifi.registry.flow.VersionControlInformation;
|
||||
import org.apache.nifi.remote.PublicPort;
|
||||
|
@ -151,8 +152,8 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||
}
|
||||
}
|
||||
|
||||
public void instantiate(final FlowManager flowManager, final ProcessGroup group) throws ProcessorInstantiationException {
|
||||
instantiate(flowManager, group, true);
|
||||
public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group) throws ProcessorInstantiationException {
|
||||
instantiate(flowManager, flowController, group, true);
|
||||
}
|
||||
|
||||
|
||||
|
@ -221,7 +222,7 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||
}
|
||||
|
||||
|
||||
public void instantiate(final FlowManager flowManager, final ProcessGroup group, final boolean topLevel) {
|
||||
public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group, final boolean topLevel) {
|
||||
//
|
||||
// Instantiate Controller Services
|
||||
//
|
||||
|
@ -406,6 +407,11 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||
procNode.setProperties(config.getProperties());
|
||||
}
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController);
|
||||
procNode.onConfigurationRestored(processContext);
|
||||
|
||||
group.addProcessor(procNode);
|
||||
} finally {
|
||||
procNode.resumeValidationTrigger();
|
||||
|
@ -526,7 +532,7 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||
childTemplateDTO.setControllerServices(contents.getControllerServices());
|
||||
|
||||
final StandardFlowSnippet childSnippet = new StandardFlowSnippet(childTemplateDTO, extensionManager);
|
||||
childSnippet.instantiate(flowManager, childGroup, false);
|
||||
childSnippet.instantiate(flowManager, flowController, childGroup, false);
|
||||
|
||||
if (groupDTO.getVersionControlInformation() != null) {
|
||||
final VersionControlInformation vci = StandardVersionControlInformation.Builder
|
||||
|
|
|
@ -97,6 +97,11 @@ public class StandardReloadComponent implements ReloadComponent {
|
|||
// need to refresh the properties in case we are changing from ghost component to real component
|
||||
existingNode.refreshProperties();
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
flowController.getStateManagerProvider().getStateManager(existingNode.getProcessor().getIdentifier()), () -> false, flowController);
|
||||
existingNode.onConfigurationRestored(processContext);
|
||||
|
||||
logger.debug("Triggering async validation of {} due to processor reload", existingNode);
|
||||
flowController.getValidationTrigger().trigger(existingNode);
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
|||
|
||||
final FlowSnippet snippet = new StandardFlowSnippet(dto, flowController.getExtensionManager());
|
||||
snippet.validate(group);
|
||||
snippet.instantiate(this, group);
|
||||
snippet.instantiate(this, flowController, group);
|
||||
|
||||
group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
|
||||
}
|
||||
|
@ -342,12 +342,6 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
|||
}
|
||||
throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
|
||||
}
|
||||
|
||||
if (flowController.isInitialized()) {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return procNode;
|
||||
|
@ -393,7 +387,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
|||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
|
||||
|
||||
if (flowController.isInitialized()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
|
||||
|
@ -497,7 +491,9 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
|||
|
||||
if (flowController.isInitialized()) {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
final ConfigurationContext configurationContext =
|
||||
new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, flowController.getVariableRegistry());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -125,6 +125,11 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
// configure the processor
|
||||
configureProcessor(processor, processorDTO);
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(processor, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
|
||||
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()), () -> false, flowController);
|
||||
processor.onConfigurationRestored(processContext);
|
||||
|
||||
return processor;
|
||||
} catch (IllegalStateException | ComponentLifeCycleException ise) {
|
||||
throw new NiFiCoreException(ise.getMessage(), ise);
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -73,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
+ "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. The scripted processor can "
|
||||
+ "implement public void setLogger(ComponentLog logger) to get access to the parent logger, as well as public void onScheduled(ProcessContext context) and "
|
||||
+ "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively. "
|
||||
+ "NOTE: The script will be loaded when the processor is populated with property values, see the Restrictions section for more security implications. "
|
||||
+ "Experimental: Impact of sustained usage not yet verified.")
|
||||
@DynamicProperty(name = "A script engine property to update", value = "The value to set it to",
|
||||
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
|
||||
|
@ -212,6 +214,12 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|
|||
invokeScriptedProcessorMethod("onScheduled", context);
|
||||
}
|
||||
|
||||
@OnConfigurationRestored
|
||||
public void onConfigurationRestored(final ProcessContext context) {
|
||||
scriptingComponentHelper.setupVariables(context);
|
||||
setup();
|
||||
}
|
||||
|
||||
public void setup() {
|
||||
if (scriptNeedsReload.get() || processor.get() == null) {
|
||||
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
|
||||
|
@ -242,8 +250,24 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|
|||
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|
||||
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|
||||
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
|
||||
|
||||
// Update the ScriptingComponentHelper's value(s)
|
||||
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)) {
|
||||
scriptingComponentHelper.setScriptPath(newValue);
|
||||
} else if (ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)) {
|
||||
scriptingComponentHelper.setScriptBody(newValue);
|
||||
} else if (ScriptingComponentUtils.MODULES.equals(descriptor)) {
|
||||
scriptingComponentHelper.setScriptBody(newValue);
|
||||
} else if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
|
||||
scriptingComponentHelper.setScriptEngineName(newValue);
|
||||
}
|
||||
|
||||
scriptNeedsReload.set(true);
|
||||
scriptRunner = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time.
|
||||
if (isConfigurationRestored()) {
|
||||
// Once the configuration has been restored, each call to onPropertyModified() is due to a change made after the processor was loaded, so reload the script
|
||||
setup();
|
||||
}
|
||||
} else if (instance != null) {
|
||||
// If the script provides a Processor, call its onPropertyModified() method
|
||||
try {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.script;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -24,6 +25,7 @@ import org.apache.nifi.controller.AbstractControllerService;
|
|||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.script.ScriptRunner;
|
||||
|
||||
|
@ -116,6 +118,12 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
|
|||
}
|
||||
}
|
||||
|
||||
@OnConfigurationRestored
|
||||
public void onConfigurationRestored(final ProcessContext context) {
|
||||
scriptingComponentHelper.setupVariables(context);
|
||||
setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
|
||||
|
|
|
@ -18,7 +18,9 @@ package org.apache.nifi.processors.script;
|
|||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
|
@ -140,14 +142,14 @@ public class TestInvokeGroovy extends BaseScriptTest {
|
|||
*/
|
||||
@Test
|
||||
public void testInvokeScriptCausesException() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new OverrideInvokeScriptedProcessor());
|
||||
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
|
||||
TEST_RESOURCE_LOCATION + "groovy/testInvokeScriptCausesException.groovy")
|
||||
);
|
||||
runner.assertValid();
|
||||
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
|
||||
assertThrows(AssertionError.class, () -> runner.run());
|
||||
assertThrows(AssertionError.class, runner::run);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -198,7 +200,7 @@ public class TestInvokeGroovy extends BaseScriptTest {
|
|||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
|
||||
assertTrue(result.size() == 1);
|
||||
assertEquals(1, result.size());
|
||||
final String expectedOutput = new String(Hex.encodeHex(MessageDigestUtils.getDigest("testbla bla".getBytes())));
|
||||
final MockFlowFile outputFlowFile = result.get(0);
|
||||
outputFlowFile.assertContentEquals(expectedOutput);
|
||||
|
@ -238,4 +240,22 @@ public class TestInvokeGroovy extends BaseScriptTest {
|
|||
MockFlowFile ff = result.get(0);
|
||||
ff.assertContentEquals("48\n47\n14\n");
|
||||
}
|
||||
|
||||
private static class OverrideInvokeScriptedProcessor extends InvokeScriptedProcessor {
|
||||
|
||||
private int numTimesModifiedCalled = 0;
|
||||
|
||||
@OnConfigurationRestored
|
||||
@Override
|
||||
public void onConfigurationRestored(ProcessContext context) {
|
||||
super.onConfigurationRestored(context);
|
||||
assertEquals(this.getSupportedPropertyDescriptors().size(), numTimesModifiedCalled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
super.onPropertyModified(descriptor, oldValue, newValue);
|
||||
numTimesModifiedCalled++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,6 @@ public class TestInvokeJython extends BaseScriptTest {
|
|||
@Test
|
||||
public void testInvalidThenFixed() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
|
|||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.nifi.connectable.Funnel;
|
|||
import org.apache.nifi.connectable.LocalPort;
|
||||
import org.apache.nifi.connectable.Port;
|
||||
import org.apache.nifi.connectable.StandardConnection;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ProcessScheduler;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
|
@ -47,6 +49,7 @@ import org.apache.nifi.controller.queue.LoadBalanceStrategy;
|
|||
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
|
||||
import org.apache.nifi.controller.repository.FlowFileEventRepository;
|
||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||
import org.apache.nifi.controller.service.StandardConfigurationContext;
|
||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.groups.RemoteProcessGroup;
|
||||
|
@ -61,6 +64,7 @@ import org.apache.nifi.nar.ExtensionManager;
|
|||
import org.apache.nifi.nar.NarCloseable;
|
||||
import org.apache.nifi.parameter.ParameterContextManager;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.StandardProcessContext;
|
||||
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
|
||||
import org.apache.nifi.registry.variable.MutableVariableRegistry;
|
||||
import org.apache.nifi.remote.StandardRemoteProcessGroup;
|
||||
|
@ -174,7 +178,10 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
|
|||
}
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
|
||||
final StateManager stateManager = statelessEngine.getStateManagerProvider().getStateManager(id);
|
||||
final StandardProcessContext processContext = new StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(),
|
||||
statelessEngine.getPropertyEncryptor(), stateManager, () -> false, new StatelessNodeTypeProvider());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor(), processContext);
|
||||
}
|
||||
|
||||
LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger());
|
||||
|
@ -292,7 +299,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
|
|||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
|
||||
|
||||
if (isFlowInitialized()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
|
||||
|
@ -343,7 +350,9 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
|
|||
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
final ConfigurationContext configurationContext =
|
||||
new StandardConfigurationContext(serviceNode, statelessEngine.getControllerServiceProvider(), null, statelessEngine.getRootVariableRegistry());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service, configurationContext);
|
||||
}
|
||||
|
||||
final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation();
|
||||
|
|
|
@ -102,6 +102,11 @@ public class StatelessReloadComponent implements ReloadComponent {
|
|||
// need to refresh the properties in case we are changing from ghost component to real component
|
||||
existingNode.refreshProperties();
|
||||
|
||||
// Notify the processor node that the configuration (properties, e.g.) has been restored
|
||||
final StandardProcessContext processContext = new StandardProcessContext(existingNode, statelessEngine.getControllerServiceProvider(),
|
||||
statelessEngine.getPropertyEncryptor(), statelessEngine.getStateManagerProvider().getStateManager(id), () -> false, new StatelessNodeTypeProvider());
|
||||
existingNode.onConfigurationRestored(processContext);
|
||||
|
||||
logger.debug("Successfully reloaded {}", existingNode);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue