mirror of https://github.com/apache/nifi.git
NIFI-8080: Compile Jython scripts before evaluating
This closes #4717 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
cfbcecc4c6
commit
2c0671cf80
|
@ -20,15 +20,22 @@ import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
|
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
|
||||||
import org.python.core.PyString;
|
import org.python.core.PyString;
|
||||||
|
|
||||||
|
import javax.script.Compilable;
|
||||||
|
import javax.script.CompiledScript;
|
||||||
import javax.script.ScriptEngine;
|
import javax.script.ScriptEngine;
|
||||||
import javax.script.ScriptException;
|
import javax.script.ScriptException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A helper class to configure the Jython engine with any specific requirements
|
* A helper class to configure the Jython engine with any specific requirements
|
||||||
*/
|
*/
|
||||||
public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator {
|
public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator {
|
||||||
|
|
||||||
|
private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getScriptEngineName() {
|
public String getScriptEngineName() {
|
||||||
return "python";
|
return "python";
|
||||||
|
@ -41,17 +48,9 @@ public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
|
public Object init(ScriptEngine engine, String[] modulePaths) {
|
||||||
if (engine != null) {
|
// Always compile when first run
|
||||||
// Need to import the module path inside the engine, in order to pick up
|
compiledScriptRef.set(null);
|
||||||
// other Python/Jython modules.
|
|
||||||
engine.eval("import sys");
|
|
||||||
if (modulePaths != null) {
|
|
||||||
for (String modulePath : modulePaths) {
|
|
||||||
engine.eval("sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +58,17 @@ public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator
|
||||||
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
|
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
|
||||||
Object returnValue = null;
|
Object returnValue = null;
|
||||||
if (engine != null) {
|
if (engine != null) {
|
||||||
returnValue = engine.eval(scriptBody);
|
final CompiledScript existing = compiledScriptRef.get();
|
||||||
|
if (existing == null) {
|
||||||
|
|
||||||
|
// Add prefix for import sys and all jython modules
|
||||||
|
String prefix = "import sys\n"
|
||||||
|
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
|
||||||
|
.collect(Collectors.joining("\n"));
|
||||||
|
final CompiledScript compiled = ((Compilable) engine).compile(prefix + scriptBody);
|
||||||
|
compiledScriptRef.compareAndSet(null, compiled);
|
||||||
|
}
|
||||||
|
returnValue = compiledScriptRef.get().eval();
|
||||||
}
|
}
|
||||||
return returnValue;
|
return returnValue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.script;
|
||||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -73,6 +74,28 @@ public class TestExecuteJython extends BaseScriptTest {
|
||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
|
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
|
||||||
runner.run();
|
runner.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore("This is more of an integration test, can be run before and after changes to ExecuteScript to measure performance improvements")
|
||||||
|
@Test
|
||||||
|
public void testPerformance() throws Exception {
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
|
||||||
|
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
|
||||||
|
"from org.apache.nifi.processors.script import ExecuteScript\n"
|
||||||
|
+ "flowFile = session.get()\n"
|
||||||
|
+ "flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n"
|
||||||
|
+ "session.transfer(flowFile, ExecuteScript.REL_SUCCESS)");
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
final int ITERATIONS = 50000;
|
||||||
|
for (int i = 0; i < ITERATIONS; i++) {
|
||||||
|
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
runner.run(ITERATIONS);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS);
|
||||||
|
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
|
||||||
|
result.get(0).assertAttributeEquals("from-content", "test content");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue