NIFI-1822 Added for loop to instantiate multiple script engines in queue.

This commit is contained in:
Andy LoPresto 2016-05-13 19:06:17 -07:00
parent 8c5ba51128
commit 8ab0ce59a4
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
1 changed files with 31 additions and 27 deletions

View File

@ -16,22 +16,6 @@
*/ */
package org.apache.nifi.processors.script; package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File; import java.io.File;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
@ -53,6 +37,21 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
/** /**
* This class contains variables and methods common to scripting processors * This class contains variables and methods common to scripting processors
@ -226,6 +225,10 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
try { try {
ProcessorLog log = getLogger(); ProcessorLog log = getLogger();
if (StringUtils.isBlank(scriptEngineName)) {
throw new IllegalArgumentException("The script engine name cannot be null");
}
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
@ -253,20 +256,21 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
} }
ScriptEngine scriptEngine = createScriptEngine(); for (int i = 0; i < numberOfScriptEngines; i++) {
try { ScriptEngine scriptEngine = createScriptEngine();
if (configurator != null) { try {
configurator.init(scriptEngine, modules); if (configurator != null) {
} configurator.init(scriptEngine, modules);
engineQ.offer(scriptEngine); }
engineQ.offer(scriptEngine);
} catch (ScriptException se) { } catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se); log.error("Error initializing script engine configurator", se);
}
} }
} }
} finally { } finally {
// Restore original context class loader // Restore original context class loader
Thread.currentThread().setContextClassLoader(originalContextClassLoader); Thread.currentThread().setContextClassLoader(originalContextClassLoader);