From f40b7fc6bad14a64bce104b3748baa95847a674c Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 13 Mar 2019 18:10:51 -0400 Subject: [PATCH] NIFI-2215: Added support for onScheduled and onStopped in InvokeScriptedProcessor NIFI-2215: Incorporated review comments This closes #3370 Signed-off-by: Mike Thomsen --- .../script/InvokeScriptedProcessor.java | 35 +++++++++++++++++-- .../test/resources/groovy/test_reader.groovy | 19 +++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index c2df1282d2..2790f83e32 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -68,7 +68,9 @@ import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter; @CapabilityDescription("Experimental - Invokes a script engine for a Processor defined in the given script. The script must define " + "a valid class that implements the Processor interface, and it must set a variable 'processor' to an instance of " + "the class. Processor methods such as onTrigger() will be delegated to the scripted Processor instance. Also any " - + "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. " + + "Relationships or PropertyDescriptors defined by the scripted processor will be added to the configuration dialog. The scripted processor can " + + "implement public void setLogger(ComponentLog logger) to get access to the parent logger, as well as public void onScheduled(ProcessContext context) and " + + "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively. " + "Experimental: Impact of sustained usage not yet verified.") @DynamicProperty(name = "A script engine property to update", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, @@ -205,6 +207,8 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { public void setup(final ProcessContext context) { scriptingComponentHelper.setupVariables(context); setup(); + + invokeScriptedProcessorMethod("onScheduled", context); } public void setup() { @@ -564,9 +568,36 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { } @OnStopped - public void stop() { + public void stop(ProcessContext context) { + invokeScriptedProcessorMethod("onStopped", context); scriptingComponentHelper.stop(); processor.set(null); scriptEngine = null; } + + private void invokeScriptedProcessorMethod(String methodName, Object... params) { + // Run the scripted processor's method here, if it exists + if (scriptEngine instanceof Invocable) { + final Invocable invocable = (Invocable) scriptEngine; + final Object obj = scriptEngine.get("processor"); + if (obj != null) { + + ComponentLog logger = getLogger(); + try { + invocable.invokeMethod(obj, methodName, params); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script Processor does not contain the method " + methodName); + } + } catch (final Exception e) { + // An error occurred during onScheduled, propagate it up + logger.error("Error while executing the scripted processor's method " + methodName, e); + if (e instanceof ProcessException) { + throw (ProcessException) e; + } + throw new ProcessException(e); + } + } + } + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy index 414b9dec0b..5d54701d4b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy @@ -24,11 +24,28 @@ class GroovyProcessor implements Processor { def descriptor = new PropertyDescriptor.Builder() .name("test-attribute").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() + def logger + + def setAttributeFromThisInOnScheduled = '' + @Override void initialize(ProcessorInitializationContext context) { } + void setLogger(log) { + logger = log + } + + void onScheduled(ProcessContext context) { + // Set the attribute value for use in onTrigger + setAttributeFromThisInOnScheduled = 'test content' + } + + void onStopped(ProcessContext context) { + logger.info("Called onStopped") + } + @Override Set getRelationships() { return [REL_TEST] as Set @@ -41,7 +58,7 @@ class GroovyProcessor implements Processor { if (flowFile == null) { return; } - flowFile = session.putAttribute(flowFile, "from-content", "test content") + flowFile = session.putAttribute(flowFile, 'from-content', setAttributeFromThisInOnScheduled) // transfer session.transfer(flowFile, REL_TEST) session.commit()