NIFI-2215: Added support for onScheduled and onStopped in InvokeScriptedProcessor

NIFI-2215: Incorporated review comments

This closes #3370

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Matthew Burgess 2019-03-13 18:10:51 -04:00 committed by Mike Thomsen
parent 25d8f64bed
commit f40b7fc6ba
2 changed files with 51 additions and 3 deletions

View File

@ -68,7 +68,9 @@ import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;
@CapabilityDescription("Experimental - Invokes a script engine for a Processor defined in the given script. The script must define " @CapabilityDescription("Experimental - Invokes a script engine for a Processor defined in the given script. The script must define "
+ "a valid class that implements the Processor interface, and it must set a variable 'processor' to an instance of " + "a valid class that implements the Processor interface, and it must set a variable 'processor' to an instance of "
+ "the class. Processor methods such as onTrigger() will be delegated to the scripted Processor instance. Also any " + "the class. Processor methods such as onTrigger() will be delegated to the scripted Processor instance. Also any "
+ "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. " + "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. The scripted processor can "
+ "implement public void setLogger(ComponentLog logger) to get access to the parent logger, as well as public void onScheduled(ProcessContext context) and "
+ "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively. "
+ "Experimental: Impact of sustained usage not yet verified.") + "Experimental: Impact of sustained usage not yet verified.")
@DynamicProperty(name = "A script engine property to update", value = "The value to set it to", @DynamicProperty(name = "A script engine property to update", value = "The value to set it to",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
@ -205,6 +207,8 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) {
scriptingComponentHelper.setupVariables(context); scriptingComponentHelper.setupVariables(context);
setup(); setup();
invokeScriptedProcessorMethod("onScheduled", context);
} }
public void setup() { public void setup() {
@ -564,9 +568,36 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
} }
@OnStopped @OnStopped
public void stop() { public void stop(ProcessContext context) {
invokeScriptedProcessorMethod("onStopped", context);
scriptingComponentHelper.stop(); scriptingComponentHelper.stop();
processor.set(null); processor.set(null);
scriptEngine = null; scriptEngine = null;
} }
private void invokeScriptedProcessorMethod(String methodName, Object... params) {
// Run the scripted processor's method here, if it exists
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
final Object obj = scriptEngine.get("processor");
if (obj != null) {
ComponentLog logger = getLogger();
try {
invocable.invokeMethod(obj, methodName, params);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Configured script Processor does not contain the method " + methodName);
}
} catch (final Exception e) {
// An error occurred during onScheduled, propagate it up
logger.error("Error while executing the scripted processor's method " + methodName, e);
if (e instanceof ProcessException) {
throw (ProcessException) e;
}
throw new ProcessException(e);
}
}
}
}
} }

View File

@ -24,11 +24,28 @@ class GroovyProcessor implements Processor {
def descriptor = new PropertyDescriptor.Builder() def descriptor = new PropertyDescriptor.Builder()
.name("test-attribute").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() .name("test-attribute").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
def logger
def setAttributeFromThisInOnScheduled = ''
@Override @Override
void initialize(ProcessorInitializationContext context) { void initialize(ProcessorInitializationContext context) {
} }
void setLogger(log) {
logger = log
}
void onScheduled(ProcessContext context) {
// Set the attribute value for use in onTrigger
setAttributeFromThisInOnScheduled = 'test content'
}
void onStopped(ProcessContext context) {
logger.info("Called onStopped")
}
@Override @Override
Set<Relationship> getRelationships() { Set<Relationship> getRelationships() {
return [REL_TEST] as Set return [REL_TEST] as Set
@ -41,7 +58,7 @@ class GroovyProcessor implements Processor {
if (flowFile == null) { if (flowFile == null) {
return; return;
} }
flowFile = session.putAttribute(flowFile, "from-content", "test content") flowFile = session.putAttribute(flowFile, 'from-content', setAttributeFromThisInOnScheduled)
// transfer // transfer
session.transfer(flowFile, REL_TEST) session.transfer(flowFile, REL_TEST)
session.commit() session.commit()