NIFI-8080: Move script compilation to configurator init method

This closes #4718

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2020-12-09 14:24:40 -05:00 committed by Mike Thomsen
parent 1d31434c48
commit 1f8b4e4779
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
17 changed files with 103 additions and 30 deletions

View File

@ -273,6 +273,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -232,6 +232,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
} else {
scriptEngine.eval(scriptToRun);

View File

@ -349,6 +349,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -33,7 +33,7 @@ public interface ScriptEngineConfigurator {
URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log);
Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException;
Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;
Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;

View File

@ -91,6 +91,7 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -95,6 +95,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -144,6 +144,7 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -189,6 +189,7 @@ public class ScriptedReportingTask extends AbstractReportingTask {
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
} else {
scriptEngine.eval(scriptToRun);

View File

@ -115,6 +115,7 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -117,6 +117,7 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script

View File

@ -29,7 +29,6 @@ import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@ -267,19 +266,8 @@ public class ScriptingComponentHelper {
for (int i = 0; i < numberOfScriptEngines; i++) {
ScriptEngine scriptEngine = createScriptEngine();
try {
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
if (!engineQ.offer(scriptEngine)) {
log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()});
}
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se);
}
if (!engineQ.offer(scriptEngine)) {
log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()});
}
}
} finally {

View File

@ -54,7 +54,7 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
scriptEngine = engine;
return scriptEngine;
}

View File

@ -44,7 +44,9 @@ public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderCon
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
// No need to compile the script here, Groovy does it under the hood and its CompiledScript object just
// calls engine.eval() the same as we do in the eval() method below
scriptEngine = engine;
return scriptEngine;
}

View File

@ -30,7 +30,7 @@ public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloade
}
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
// No initialization methods needed at present
return engine;
}

View File

@ -48,10 +48,17 @@ public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator
}
@Override
public Object init(ScriptEngine engine, String[] modulePaths) {
public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
// Always compile when first run
compiledScriptRef.set(null);
return null;
if (engine != null && compiledScriptRef.get() == null) {
// Add prefix for import sys and all jython modules
String prefix = "import sys\n"
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
.collect(Collectors.joining("\n"));
final CompiledScript compiled = ((Compilable) engine).compile(prefix + scriptBody);
compiledScriptRef.set(compiled);
}
return compiledScriptRef.get();
}
@Override
@ -60,13 +67,7 @@ public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator
if (engine != null) {
final CompiledScript existing = compiledScriptRef.get();
if (existing == null) {
// Add prefix for import sys and all jython modules
String prefix = "import sys\n"
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
.collect(Collectors.joining("\n"));
final CompiledScript compiled = ((Compilable) engine).compile(prefix + scriptBody);
compiledScriptRef.compareAndSet(null, compiled);
throw new ScriptException("Jython script has not been compiled, the processor must be restarted.");
}
returnValue = compiledScriptRef.get().eval();
}

View File

@ -58,8 +58,8 @@ import static org.mockito.Mockito.when
* Unit tests for ScriptedReportingTask.
*/
@RunWith(JUnit4.class)
class ScriptedReportingTaskGroovyTest {
private static final Logger logger = LoggerFactory.getLogger(ScriptedReportingTaskGroovyTest)
class ScriptedReportingTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ScriptedReportingTaskTest)
def task
def runner
def scriptingComponent
@ -197,6 +197,50 @@ class ScriptedReportingTaskGroovyTest {
}
@Test
void testVMEventsJythonScript() {
def properties = [:] as Map<PropertyDescriptor, String>
task.getSupportedPropertyDescriptors().each { PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('python'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/jython/test_log_vm_stats.py'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
// Set up ReportingContext
def context = mock(ReportingContext)
when(context.getStateManager()).thenReturn(new MockStateManager(task))
doAnswer({ invocation ->
PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor)
return new MockPropertyValue(properties[descriptor])
} as Answer<PropertyValue>
).when(context).getProperty(any(PropertyDescriptor))
def logger = mock(ComponentLog)
def initContext = mock(ReportingInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
task.initialize initContext
task.setup configurationContext
task.onTrigger context
def se = task.scriptEngine
// This script should store a variable called x with a map of stats to values
assertTrue se.x?.uptime >= 0
task.offerScriptEngine(se)
}
class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper {
def getScriptEngine() {
return scriptingComponentHelper.engineQ.poll()

View File

@ -0,0 +1,29 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
x = {
'uptime': vmMetrics.uptime(),
'heapUsed': vmMetrics.heapUsed(None),
'heapUsage': vmMetrics.heapUsage(),
'nonHeapUsage': vmMetrics.nonHeapUsage(),
'threadCount': vmMetrics.threadCount(),
'daemonThreadCount': vmMetrics.daemonThreadCount(),
'fileDescriptorUsage': vmMetrics.fileDescriptorUsage()
}