NIFI-1822: Allow concurrent execution in ExecuteScript

This commit is contained in:
Matt Burgess 2016-04-28 09:53:16 -04:00 committed by Andy LoPresto
parent 87d96c0225
commit 23e4f685c3
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
3 changed files with 42 additions and 17 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -48,7 +49,9 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -102,7 +105,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
protected String scriptBody;
protected String[] modules;
protected List<PropertyDescriptor> descriptors;
protected ScriptEngine scriptEngine;
protected BlockingQueue<ScriptEngine> engineQ = null;
/**
* Custom validation for ensuring exactly one of Script File or Script Body is populated
@ -197,7 +201,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
*/
public void setup() {
public void setup(int numberOfScriptEngines) {
if (scriptEngineConfiguratorMap.isEmpty()) {
ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
@ -206,7 +210,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator);
}
}
setupEngine();
setupEngines(numberOfScriptEngines);
}
/**
@ -216,7 +220,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
*
* @see org.apache.nifi.processors.script.ScriptEngineConfigurator
*/
protected void setupEngine() {
protected void setupEngines(int numberOfScriptEngines) {
engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines);
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
ProcessorLog log = getLogger();
@ -248,11 +253,13 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
}
scriptEngine = createScriptEngine();
ScriptEngine scriptEngine = createScriptEngine();
try {
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
engineQ.offer(scriptEngine);
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) {
@ -300,4 +307,11 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
return new URLClassLoader(modules, thisClassLoader);
}
@OnStopped
public void stop() {
if (engineQ != null) {
engineQ.clear();
}
}
}

View File

@ -18,8 +18,6 @@ package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -35,6 +33,7 @@ import org.apache.nifi.util.StringUtils;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
@ -45,7 +44,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@TriggerSerially
@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
@CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
@ -128,7 +126,9 @@ public class ExecuteScript extends AbstractScriptProcessor {
} else {
modules = new String[0];
}
super.setup();
// Create a script engine for each possible task
int maxTasks = context.getMaxConcurrentTasks();
super.setup(maxTasks);
scriptToRun = scriptBody;
try {
@ -161,6 +161,11 @@ public class ExecuteScript extends AbstractScriptProcessor {
createResources();
}
}
ScriptEngine scriptEngine = engineQ.poll();
if (scriptEngine == null) {
// No engine available so nothing more to do here
return;
}
ProcessorLog log = getLogger();
ProcessSession session = sessionFactory.createSession();
try {
@ -211,11 +216,8 @@ public class ExecuteScript extends AbstractScriptProcessor {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
} finally {
engineQ.offer(scriptEngine);
}
}
@OnStopped
public void stop() {
scriptEngine = null;
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import org.apache.commons.io.IOUtils;
@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private ScriptEngine scriptEngine = null;
/**
* Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
* processor has defined additional relationships, those will be added as well.
@ -174,9 +177,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
setup();
}
@Override
public void setup() {
super.setup();
// Create a single script engine, the Processor object is reused by each task
super.setup(1);
scriptEngine = engineQ.poll();
if (scriptEngine == null) {
throw new ProcessException("No script engine available!");
}
if (scriptNeedsReload.get() || processor.get() == null) {
if (isFile(scriptPath)) {
reloadScriptFile(scriptPath);
@ -386,7 +394,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
Collection<ValidationResult> commonValidationResults = super.customValidate(context);
if(!commonValidationResults.isEmpty()) {
if (!commonValidationResults.isEmpty()) {
return commonValidationResults;
}
@ -486,6 +494,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
@OnStopped
public void stop() {
super.stop();
processor.set(null);
}
}