NIFI-2665: This closes #942. Fixed intermittent validation errors in InvokeScriptedProcessor

This commit is contained in:
Matt Burgess 2016-08-25 15:25:26 -04:00 committed by joewitt
parent 1624dd8e4d
commit a84d3c9873
2 changed files with 17 additions and 25 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.script;
import org.apache.nifi.logging.ComponentLog;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@ -42,6 +43,7 @@ import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -199,6 +201,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
/**
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
*
* @param numberOfScriptEngines number of engines to setup
*/
public void setup(int numberOfScriptEngines) {
@ -253,7 +256,9 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
// Need the right classloader when the engine is created. This ensures the NAR's execution class loader
// (plus the module path) becomes the parent for the script engine
ClassLoader scriptEngineModuleClassLoader = createScriptEngineModuleClassLoader(additionalClasspathURLs);
ClassLoader scriptEngineModuleClassLoader = additionalClasspathURLs != null
? new URLClassLoader(additionalClasspathURLs, originalContextClassLoader)
: originalContextClassLoader;
if (scriptEngineModuleClassLoader != null) {
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
}
@ -264,7 +269,9 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
engineQ.offer(scriptEngine);
if (!engineQ.offer(scriptEngine)) {
log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()});
}
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
@ -296,25 +303,6 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
return factory.getScriptEngine();
}
/**
* Creates a classloader to be used by the selected script engine and the provided script file. This
* classloader has this class's classloader as a parent (versus the current thread's context
* classloader) and also adds the specified module URLs to the classpath. This enables scripts
* to use other scripts, modules, etc. without having to build them into the scripting NAR.
* If the parameter is null or empty, this class's classloader is returned
*
* @param modules An array of URLs to add to the class loader
* @return ClassLoader for script engine
*/
protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) {
ClassLoader thisClassLoader = this.getClass().getClassLoader();
if (modules == null) {
return thisClassLoader;
}
return new URLClassLoader(modules, thisClassLoader);
}
@OnStopped
public void stop() {
if (engineQ != null) {

View File

@ -67,9 +67,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private final AtomicReference<Processor> processor = new AtomicReference<>();
private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private ScriptEngine scriptEngine = null;
private volatile ScriptEngine scriptEngine = null;
private volatile String kerberosServicePrincipal = null;
private volatile File kerberosConfigFile = null;
private volatile File kerberosServiceKeytab = null;
@ -195,8 +195,11 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
public void setup() {
// Create a single script engine, the Processor object is reused by each task
super.setup(1);
scriptEngine = engineQ.poll();
if(scriptEngine == null) {
super.setup(1);
scriptEngine = engineQ.poll();
}
if (scriptEngine == null) {
throw new ProcessException("No script engine available!");
}
@ -535,5 +538,6 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
public void stop() {
super.stop();
processor.set(null);
scriptEngine = null;
}
}