From 23e4f685c3e6a5c83538b7e3e379f1bb1c6002b7 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 28 Apr 2016 09:53:16 -0400 Subject: [PATCH] NIFI-1822: Allow concurrent execution in ExecuteScript --- .../script/AbstractScriptProcessor.java | 24 +++++++++++++++---- .../nifi/processors/script/ExecuteScript.java | 20 +++++++++------- .../script/InvokeScriptedProcessor.java | 15 +++++++++--- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index b7e73bc950..99a18bf73d 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.script; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -48,7 +49,9 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -102,7 +105,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc protected String scriptBody; protected String[] modules; protected List descriptors; - protected ScriptEngine scriptEngine; + + protected BlockingQueue engineQ = null; /** * Custom validation for ensuring exactly one of Script File or Script Body is populated @@ -197,7 +201,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * Performs common setup operations when the processor is scheduled to run. This method assumes the member * variables associated with properties have been filled. */ - public void setup() { + public void setup(int numberOfScriptEngines) { if (scriptEngineConfiguratorMap.isEmpty()) { ServiceLoader configuratorServiceLoader = @@ -206,7 +210,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); } } - setupEngine(); + setupEngines(numberOfScriptEngines); } /** @@ -216,7 +220,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * * @see org.apache.nifi.processors.script.ScriptEngineConfigurator */ - protected void setupEngine() { + protected void setupEngines(int numberOfScriptEngines) { + engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); try { ProcessorLog log = getLogger(); @@ -248,11 +253,13 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - scriptEngine = createScriptEngine(); + ScriptEngine scriptEngine = createScriptEngine(); try { if (configurator != null) { configurator.init(scriptEngine, modules); } + engineQ.offer(scriptEngine); + } catch (ScriptException se) { log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); if (log.isDebugEnabled()) { @@ -300,4 +307,11 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc return new URLClassLoader(modules, thisClassLoader); } + + @OnStopped + public void stop() { + if (engineQ != null) { + engineQ.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 3f7c202d51..7d5ee5225a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -18,8 +18,6 @@ package org.apache.nifi.processors.script; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -35,6 +33,7 @@ import org.apache.nifi.util.StringUtils; import javax.script.Bindings; import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import javax.script.SimpleBindings; import java.io.FileInputStream; @@ -45,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -@TriggerSerially @Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) @CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " @@ -128,7 +126,9 @@ public class ExecuteScript extends AbstractScriptProcessor { } else { modules = new String[0]; } - super.setup(); + // Create a script engine for each possible task + int maxTasks = context.getMaxConcurrentTasks(); + super.setup(maxTasks); scriptToRun = scriptBody; try { @@ -161,6 +161,11 @@ public class ExecuteScript extends AbstractScriptProcessor { createResources(); } } + ScriptEngine scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + // No engine available so nothing more to do here + return; + } ProcessorLog log = getLogger(); ProcessSession session = sessionFactory.createSession(); try { @@ -211,11 +216,8 @@ public class ExecuteScript extends AbstractScriptProcessor { getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); session.rollback(true); throw t; + } finally { + engineQ.offer(scriptEngine); } } - - @OnStopped - public void stop() { - scriptEngine = null; - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 2f287b6103..06c0a7f18b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.script.Invocable; +import javax.script.ScriptEngine; import javax.script.ScriptException; import org.apache.commons.io.IOUtils; @@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + private ScriptEngine scriptEngine = null; + /** * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script * processor has defined additional relationships, those will be added as well. @@ -174,9 +177,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { setup(); } - @Override public void setup() { - super.setup(); + // Create a single script engine, the Processor object is reused by each task + super.setup(1); + scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + if (scriptNeedsReload.get() || processor.get() == null) { if (isFile(scriptPath)) { reloadScriptFile(scriptPath); @@ -386,7 +394,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { protected Collection customValidate(final ValidationContext context) { Collection commonValidationResults = super.customValidate(context); - if(!commonValidationResults.isEmpty()) { + if (!commonValidationResults.isEmpty()) { return commonValidationResults; } @@ -486,6 +494,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { @OnStopped public void stop() { + super.stop(); processor.set(null); } }