mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 01:58:32 +00:00
NIFI-10711 Made the script runner more debuggable.
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #6590
This commit is contained in:
parent
8d2789dbc4
commit
b6d95faa95
@ -247,6 +247,9 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
|
||||
throw new ProcessException(t);
|
||||
}
|
||||
} catch (final Throwable t) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Script as executed by NiFi with preloads {}", scriptRunner.getScript());
|
||||
}
|
||||
// Mimic AbstractProcessor behavior here
|
||||
getLogger().error("{} failed to process due to {}; rolling back session", this, t);
|
||||
|
||||
|
@ -582,6 +582,9 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|
||||
// run the processor
|
||||
instance.onTrigger(context, sessionFactory);
|
||||
} catch (final ProcessException e) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Script as executed by NiFi with preloads {}", scriptRunner.getScript());
|
||||
}
|
||||
final String message = String.format("An error occurred executing the configured Processor [%s]: %s",
|
||||
context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue(), e);
|
||||
log.error(message);
|
||||
|
@ -22,6 +22,7 @@ import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptException;
|
||||
|
||||
public interface ScriptRunner {
|
||||
String getScript();
|
||||
|
||||
String getScriptEngineName();
|
||||
|
||||
|
@ -132,6 +132,9 @@ public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
|
||||
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
|
||||
evaluator = createEvaluator(scriptEngine, flowFile);
|
||||
} catch (final ScriptException se) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Script as executed by NiFi with preloads {}", scriptRunner.getScript());
|
||||
}
|
||||
getLogger().error("Failed to initialize script engine", se);
|
||||
session.transfer(flowFile, RELATIONSHIP_FAILURE);
|
||||
return;
|
||||
|
@ -99,6 +99,9 @@ public abstract class ScriptedRouterProcessor<T> extends ScriptedRecordProcessor
|
||||
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
|
||||
evaluator = createEvaluator(scriptEngine, flowFile);
|
||||
} catch (final ScriptException se) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Script as executed by NiFi with preloads {}", scriptRunner.getScript());
|
||||
}
|
||||
getLogger().error("Failed to initialize script engine", se);
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
return;
|
||||
|
@ -118,6 +118,9 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
|
||||
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
|
||||
evaluator = createEvaluator(scriptEngine, flowFile);
|
||||
} catch (final ScriptException se) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Script as executed by NiFi with preloads {}", scriptRunner.getScript());
|
||||
}
|
||||
getLogger().error("Failed to initialize script engine", se);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
|
@ -17,8 +17,8 @@
|
||||
package org.apache.nifi.record.script;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.AbstractScriptedControllerService;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
package org.apache.nifi.script.impl;
|
||||
|
||||
import org.apache.nifi.processors.script.ScriptRunner;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import javax.script.ScriptEngine;
|
||||
|
||||
@ -31,13 +32,25 @@ public abstract class BaseScriptRunner implements ScriptRunner {
|
||||
protected String[] modulePaths;
|
||||
|
||||
public BaseScriptRunner(final ScriptEngine engine, final String scriptBody, final String[] modulePaths) {
|
||||
this(engine, scriptBody, null, modulePaths);
|
||||
}
|
||||
|
||||
public BaseScriptRunner(final ScriptEngine engine, final String scriptBody, final String preloads, final String[] modulePaths) {
|
||||
this.scriptEngine = engine;
|
||||
this.scriptBody = scriptBody;
|
||||
this.modulePaths = modulePaths;
|
||||
if (StringUtils.isNotEmpty(preloads)) {
|
||||
this.scriptBody = preloads + scriptBody;
|
||||
} else {
|
||||
this.scriptBody = scriptBody;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScriptEngine getScriptEngine() {
|
||||
return scriptEngine;
|
||||
}
|
||||
|
||||
public String getScript() {
|
||||
return scriptBody;
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,14 @@ public class ClojureScriptRunner extends BaseScriptRunner {
|
||||
+ ")\n";
|
||||
|
||||
public ClojureScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
|
||||
super(engine, scriptBody, modulePaths);
|
||||
super(engine, scriptBody, buildPreloads(engine), modulePaths);
|
||||
}
|
||||
|
||||
private static String buildPreloads(ScriptEngine engine) {
|
||||
return "(ns " + ((ClojureScriptEngine) engine).getNamespace() +
|
||||
" " +
|
||||
PRELOADS +
|
||||
")\n";
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -57,12 +64,7 @@ public class ClojureScriptRunner extends BaseScriptRunner {
|
||||
|
||||
@Override
|
||||
public void run(Bindings bindings) throws ScriptException {
|
||||
String sb = "(ns " + ((ClojureScriptEngine) scriptEngine).getNamespace() +
|
||||
" " +
|
||||
PRELOADS +
|
||||
")\n" +
|
||||
scriptBody;
|
||||
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
|
||||
scriptEngine.eval(sb);
|
||||
scriptEngine.eval(scriptBody);
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ public class GroovyScriptRunner extends BaseScriptRunner {
|
||||
+ "import org.apache.nifi.lookup.*\n";
|
||||
|
||||
public GroovyScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
|
||||
super(engine, scriptBody, modulePaths);
|
||||
super(engine, scriptBody, PRELOADS, modulePaths);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -48,6 +48,6 @@ public class GroovyScriptRunner extends BaseScriptRunner {
|
||||
@Override
|
||||
public void run(Bindings bindings) throws ScriptException {
|
||||
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
|
||||
scriptEngine.eval(PRELOADS + scriptBody);
|
||||
scriptEngine.eval(scriptBody);
|
||||
}
|
||||
}
|
||||
|
@ -34,12 +34,15 @@ public class JythonScriptRunner extends BaseScriptRunner {
|
||||
private final CompiledScript compiledScript;
|
||||
|
||||
public JythonScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
|
||||
super(engine, scriptBody, modulePaths);
|
||||
super(engine, scriptBody, buildPreloads(modulePaths), modulePaths);
|
||||
// Add prefix for import sys and all jython modules
|
||||
String prefix = "import sys\n"
|
||||
compiledScript = ((Compilable) engine).compile(this.scriptBody);
|
||||
}
|
||||
|
||||
private static String buildPreloads(String[] modulePaths) {
|
||||
return "import sys\n"
|
||||
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
|
||||
.collect(Collectors.joining("\n")) + "\n";
|
||||
compiledScript = ((Compilable) engine).compile(prefix + scriptBody);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user