From a4d435c9952f306f748fa004fc839eba318f6187 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Tue, 16 Mar 2021 21:24:26 -0400 Subject: [PATCH] NIFI-8330: Fix compilation of Jython scripts for scripting components - Add onPropertyModified to ExecuteScript to recompile on change This closes #4903. Signed-off-by: Mark Payne --- .../script/BaseScriptedLookupService.java | 1 + .../nifi/processors/script/ExecuteScript.java | 38 +++++++++++++++++- .../script/InvokeScriptedProcessor.java | 1 + .../script/ScriptEngineConfigurator.java | 2 + .../nifi/record/script/ScriptedReader.java | 1 + .../script/ScriptedRecordSetWriter.java | 1 + .../sink/script/ScriptedRecordSink.java | 1 + .../script/ScriptedReportingTask.java | 8 ++++ .../engine/script/ScriptedRulesEngine.java | 1 + .../script/ScriptedActionHandler.java | 1 + .../impl/JythonScriptEngineConfigurator.java | 7 +++- .../script/ExecuteScriptGroovyTest.groovy | 40 +++++++++++++++---- .../processors/script/TestInvokeJython.java | 27 +++++++++++++ .../setAttributeGoodbye_executescript.groovy | 21 ++++++++++ .../setAttributeHello_executescript.groovy | 21 ++++++++++ 15 files changed, 161 insertions(+), 10 deletions(-) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeGoodbye_executescript.groovy create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeHello_executescript.groovy diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java index 7d63724fc4..ab2cc4793a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java @@ -273,6 +273,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService // Find a custom configurator and invoke their eval() method ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); if (configurator != null) { + configurator.reset(); configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); } else { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 82ec3ad634..362d942531 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -150,6 +150,35 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se return scriptingComponentHelper.customValidate(validationContext); } + + /** + * Handles changes to this processor's properties. If changes are made to + * script- or engine-related properties, the script will be reloaded. + * + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) + || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) + || ScriptingComponentUtils.MODULES.equals(descriptor) + || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + + // Reset the configurator on change, this can indicate to the configurator to recompile the script on next init() + String scriptEngineName = scriptingComponentHelper.getScriptEngineName(); + if (scriptEngineName != null) { + ScriptEngineConfigurator configurator = + scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); + if (configurator != null) { + configurator.reset(); + } + } + } + } + /** * Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's * properties, as well as reloading the script (from file or the "Script Body" property) @@ -243,6 +272,13 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se // class with InvokeScriptedProcessor session.commit(); } catch (ScriptException e) { + // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init() + ScriptEngineConfigurator configurator = + scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + if (configurator != null) { + configurator.reset(); + } + // The below 'session.rollback(true)' reverts any changes made during this session (all FlowFiles are // restored back to their initial session state and back to their original queues after being penalized). // However if the incoming relationship is full of flow files, this processor will keep failing and could @@ -253,7 +289,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se } } catch (final Throwable t) { // Mimic AbstractProcessor behavior here - getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + getLogger().error("{} failed to process due to {}; rolling back session", this, t); // the rollback might not penalize the incoming flow file if the exception is thrown before the user gets // the flow file from the session binding (ff = session.get()). diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index fafaf8ab46..366f8fa85f 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -349,6 +349,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor { // Find a custom configurator and invoke their eval() method ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); if (configurator != null) { + configurator.reset(); configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); } else { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java index c734976e22..995c16ebd8 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java @@ -37,4 +37,6 @@ public interface ScriptEngineConfigurator { Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException; + default void reset() { + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java index 25d0563d65..009d1d2d25 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java @@ -91,6 +91,7 @@ public class ScriptedReader extends AbstractScriptedRecordFactory logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") } } @Before - public void setUp() throws Exception { + void setUp() throws Exception { super.setupExecuteScript() runner.setValidateExpressionUsage(false) @@ -56,8 +56,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { } @After - public void tearDown() throws Exception { - + void tearDown() throws Exception { } private void setupPooledExecuteScript(int poolSize = 2) { @@ -76,7 +75,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { } @Test - public void testShouldExecuteScript() throws Exception { + void testShouldExecuteScript() throws Exception { // Arrange final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ @@ -98,7 +97,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { } @Test - public void testShouldExecuteScriptSerially() throws Exception { + void testShouldExecuteScriptSerially() throws Exception { // Arrange final int ITERATIONS = 10 @@ -122,7 +121,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { } @Test - public void testShouldExecuteScriptWithPool() throws Exception { + void testShouldExecuteScriptWithPool() throws Exception { // Arrange final int ITERATIONS = 10 final int POOL_SIZE = 2 @@ -153,7 +152,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { @Ignore("This test fails intermittently when the serial execution happens faster than pooled") @Test - public void testPooledExecutionShouldBeFaster() throws Exception { + void testPooledExecutionShouldBeFaster() throws Exception { // Arrange final int ITERATIONS = 1000 final int POOL_SIZE = 4 @@ -200,4 +199,29 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { assert serialExecutionTime > parallelExecutionTime } + + @Test + void testExecuteScriptRecompileOnChange() throws Exception { + + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeHello_executescript.groovy") + runner.enqueue('') + runner.run() + + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + flowFile.assertAttributeExists('greeting') + flowFile.assertAttributeEquals('greeting', 'hello') + runner.clearTransferState() + + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeGoodbye_executescript.groovy") + runner.enqueue('') + runner.run() + + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + flowFile = result.get(0) + flowFile.assertAttributeExists('greeting') + flowFile.assertAttributeEquals('greeting', 'good-bye') + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java index 5b17e0422b..3b90d4ac96 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeJython.java @@ -61,6 +61,33 @@ public class TestInvokeJython extends BaseScriptTest { Assert.assertEquals("Never valid.", results.iterator().next().getExplanation()); } + /** + * Tests a script that has a Jython processor that begins invalid then is fixed. + * + * @throws Exception Any error encountered while testing + */ + @Test + public void testInvalidThenFixed() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor()); + runner.setValidateExpressionUsage(false); + runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python"); + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py"); + + final Collection results = ((MockProcessContext) runner.getProcessContext()).validate(); + Assert.assertEquals(1L, results.size()); + Assert.assertEquals("Never valid.", results.iterator().next().getExplanation()); + + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py"); + runner.setProperty("for-attributes", "value-1"); + + final Map attributes = new HashMap<>(); + attributes.put("for-attributes", "value-2"); + + runner.assertValid(); + runner.enqueue(new byte[0], attributes); + runner.run(); + } + /** * Test a script that has a Jython processor that reads a value from a processor property and another from a flowfile attribute then stores both in the attributes of the flowfile being routed. *

diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeGoodbye_executescript.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeGoodbye_executescript.groovy new file mode 100644 index 0000000000..83780cdd11 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeGoodbye_executescript.groovy @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the 'License') you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +flowfile = session.get() +if(!flowfile) return +flowfile = session.putAttribute(flowfile, 'greeting', 'good-bye') +session.transfer(flowfile, REL_SUCCESS) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeHello_executescript.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeHello_executescript.groovy new file mode 100644 index 0000000000..0c531e170d --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/setAttributeHello_executescript.groovy @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the 'License') you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +flowfile = session.get() +if(!flowfile) return +flowfile = session.putAttribute(flowfile, 'greeting', 'hello') +session.transfer(flowfile, REL_SUCCESS)