NIFI-8330: Fix compilation of Jython scripts for scripting components

- Add onPropertyModified to ExecuteScript to recompile on change

This closes #4903.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matthew Burgess 2021-03-16 21:24:26 -04:00 committed by Mark Payne
parent 340b13033f
commit a4d435c995
15 changed files with 161 additions and 10 deletions

View File

@ -273,6 +273,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -150,6 +150,35 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
return scriptingComponentHelper.customValidate(validationContext);
}
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
// Reset the configurator on change, this can indicate to the configurator to recompile the script on next init()
String scriptEngineName = scriptingComponentHelper.getScriptEngineName();
if (scriptEngineName != null) {
ScriptEngineConfigurator configurator =
scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
if (configurator != null) {
configurator.reset();
}
}
}
}
/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
* properties, as well as reloading the script (from file or the "Script Body" property)
@ -243,6 +272,13 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
// class with InvokeScriptedProcessor
session.commit();
} catch (ScriptException e) {
// Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
ScriptEngineConfigurator configurator =
scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
}
// The below 'session.rollback(true)' reverts any changes made during this session (all FlowFiles are
// restored back to their initial session state and back to their original queues after being penalized).
// However if the incoming relationship is full of flow files, this processor will keep failing and could
@ -253,7 +289,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
}
} catch (final Throwable t) {
// Mimic AbstractProcessor behavior here
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
getLogger().error("{} failed to process due to {}; rolling back session", this, t);
// the rollback might not penalize the incoming flow file if the exception is thrown before the user gets
// the flow file from the session binding (ff = session.get()).

View File

@ -349,6 +349,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -37,4 +37,6 @@ public interface ScriptEngineConfigurator {
Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;
default void reset() {
}
}

View File

@ -91,6 +91,7 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -95,6 +95,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -144,6 +144,7 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -195,6 +195,14 @@ public class ScriptedReportingTask extends AbstractReportingTask {
scriptEngine.eval(scriptToRun);
}
} catch (ScriptException e) {
// Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
ScriptEngineConfigurator configurator =
scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.reset();
}
throw new ProcessException(e);
}
} catch (final Throwable t) {

View File

@ -115,6 +115,7 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -117,6 +117,7 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.reset();
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {

View File

@ -67,10 +67,15 @@ public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator
if (engine != null) {
final CompiledScript existing = compiledScriptRef.get();
if (existing == null) {
throw new ScriptException("Jython script has not been compiled, the processor must be restarted.");
throw new ScriptException("Jython script has not been compiled successfully, the component must be restarted.");
}
returnValue = compiledScriptRef.get().eval();
}
return returnValue;
}
@Override
public void reset() {
compiledScriptRef.set(null);
}
}

View File

@ -39,14 +39,14 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class)
@BeforeClass
public static void setUpOnce() throws Exception {
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
public void setUp() throws Exception {
void setUp() throws Exception {
super.setupExecuteScript()
runner.setValidateExpressionUsage(false)
@ -56,8 +56,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
}
@After
public void tearDown() throws Exception {
void tearDown() throws Exception {
}
private void setupPooledExecuteScript(int poolSize = 2) {
@ -76,7 +75,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
}
@Test
public void testShouldExecuteScript() throws Exception {
void testShouldExecuteScript() throws Exception {
// Arrange
final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/
@ -98,7 +97,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
}
@Test
public void testShouldExecuteScriptSerially() throws Exception {
void testShouldExecuteScriptSerially() throws Exception {
// Arrange
final int ITERATIONS = 10
@ -122,7 +121,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
}
@Test
public void testShouldExecuteScriptWithPool() throws Exception {
void testShouldExecuteScriptWithPool() throws Exception {
// Arrange
final int ITERATIONS = 10
final int POOL_SIZE = 2
@ -153,7 +152,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
@Ignore("This test fails intermittently when the serial execution happens faster than pooled")
@Test
public void testPooledExecutionShouldBeFaster() throws Exception {
void testPooledExecutionShouldBeFaster() throws Exception {
// Arrange
final int ITERATIONS = 1000
final int POOL_SIZE = 4
@ -200,4 +199,29 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
assert serialExecutionTime > parallelExecutionTime
}
@Test
void testExecuteScriptRecompileOnChange() throws Exception {
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeHello_executescript.groovy")
runner.enqueue('')
runner.run()
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1)
List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
MockFlowFile flowFile = result.get(0)
flowFile.assertAttributeExists('greeting')
flowFile.assertAttributeEquals('greeting', 'hello')
runner.clearTransferState()
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeGoodbye_executescript.groovy")
runner.enqueue('')
runner.run()
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1)
result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
flowFile = result.get(0)
flowFile.assertAttributeExists('greeting')
flowFile.assertAttributeEquals('greeting', 'good-bye')
}
}

View File

@ -61,6 +61,33 @@ public class TestInvokeJython extends BaseScriptTest {
Assert.assertEquals("Never valid.", results.iterator().next().getExplanation());
}
/**
* Tests a script that has a Jython processor that begins invalid then is fixed.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testInvalidThenFixed() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
final Collection<ValidationResult> results = ((MockProcessContext) runner.getProcessContext()).validate();
Assert.assertEquals(1L, results.size());
Assert.assertEquals("Never valid.", results.iterator().next().getExplanation());
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py");
runner.setProperty("for-attributes", "value-1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("for-attributes", "value-2");
runner.assertValid();
runner.enqueue(new byte[0], attributes);
runner.run();
}
/**
* Test a script that has a Jython processor that reads a value from a processor property and another from a flowfile attribute then stores both in the attributes of the flowfile being routed.
* <p>

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
flowfile = session.get()
if(!flowfile) return
flowfile = session.putAttribute(flowfile, 'greeting', 'good-bye')
session.transfer(flowfile, REL_SUCCESS)

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
flowfile = session.get()
if(!flowfile) return
flowfile = session.putAttribute(flowfile, 'greeting', 'hello')
session.transfer(flowfile, REL_SUCCESS)