NIFI-210: Add support for multiple module paths for scripting

This commit is contained in:
Matt Burgess 2016-02-03 10:25:18 -05:00
parent 2d6fec5fa9
commit 478226451c
10 changed files with 234 additions and 79 deletions

View File

@ -86,10 +86,10 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
.name("Module Directory")
.description("Path to a directory which contains modules required by the script.")
.description("Comma-separated list of paths to files and/or directories which contain modules required by the script.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(new StandardValidators.DirectoryExistsValidator(true, false))
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
// A map from engine name to a custom configurator for that engine
@ -100,7 +100,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
protected String scriptEngineName;
protected String scriptPath;
protected String scriptBody;
protected String modulePath;
protected String[] modules;
protected List<PropertyDescriptor> descriptors;
protected ScriptEngine scriptEngine;
@ -196,7 +196,6 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
/**
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
*
*/
public void setup() {
@ -204,11 +203,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
ServiceLoader.load(ScriptEngineConfigurator.class);
for (ScriptEngineConfigurator configurator : configuratorServiceLoader) {
String configuratorScriptEngineName = configurator.getScriptEngineName();
if (configuratorScriptEngineName != null
&& configuratorScriptEngineName.equals(scriptEngineName)) {
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName(), configurator);
}
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator);
}
}
setupEngine();
@ -226,29 +221,42 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
try {
ProcessorLog log = getLogger();
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
URL[] additionalClasspathURLs = null;
if (configurator != null) {
additionalClasspathURLs = configurator.getModuleURLsForClasspath(modules, log);
} else {
if (modules != null) {
List<URL> urls = new LinkedList<>();
for (String modulePathString : modules) {
try {
urls.add(new File(modulePathString).toURI().toURL());
} catch (MalformedURLException mue) {
log.error("{} is not a valid file, ignoring", new Object[]{modulePathString}, mue);
}
}
additionalClasspathURLs = urls.toArray(new URL[urls.size()]);
}
}
// Need the right classloader when the engine is created. This ensures the NAR's execution class loader
// (plus the module path) becomes the parent for the script engine
ClassLoader scriptEngineModuleClassLoader = createScriptEngineModuleClassLoader(modulePath);
ClassLoader scriptEngineModuleClassLoader = createScriptEngineModuleClassLoader(additionalClasspathURLs);
if (scriptEngineModuleClassLoader != null) {
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
}
scriptEngine = createScriptEngine();
ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
ServiceLoader.load(ScriptEngineConfigurator.class);
for (ScriptEngineConfigurator configurator : configuratorServiceLoader) {
String configuratorScriptEngineName = configurator.getScriptEngineName();
try {
if (configuratorScriptEngineName != null
&& configuratorScriptEngineName.equalsIgnoreCase(scriptEngineName)) {
configurator.init(scriptEngine, modulePath);
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName(), configurator);
}
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}",
new Object[]{configuratorScriptEngineName});
if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se);
}
try {
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se);
}
}
@ -278,25 +286,18 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
/**
* Creates a classloader to be used by the selected script engine and the provided script file. This
* classloader has this class's classloader as a parent (versus the current thread's context
* classloader) and also adds the specified module directory to the classpath. This enables scripts
* classloader) and also adds the specified module URLs to the classpath. This enables scripts
* to use other scripts, modules, etc. without having to build them into the scripting NAR.
* If the parameter is null or empty, this class's classloader is returned
*
* @param modulePath The path to a directory containing modules to be used by the script(s)
* @param modules An array of URLs to add to the class loader
*/
protected ClassLoader createScriptEngineModuleClassLoader(String modulePath) {
URLClassLoader newModuleClassLoader = null;
protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) {
ClassLoader thisClassLoader = this.getClass().getClassLoader();
if (StringUtils.isEmpty(modulePath)) {
if (modules == null) {
return thisClassLoader;
}
try {
newModuleClassLoader =
new URLClassLoader(
new URL[]{new File(modulePath).toURI().toURL()}, thisClassLoader);
} catch (MalformedURLException mue) {
getLogger().error("Couldn't find modules directory at " + modulePath, mue);
}
return newModuleClassLoader;
return new URLClassLoader(modules, thisClassLoader);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import javax.script.Bindings;
import javax.script.ScriptContext;
@ -121,7 +122,12 @@ public class ExecuteScript extends AbstractScriptProcessor {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
modulePath = context.getProperty(MODULES).evaluateAttributeExpressions().getValue();
String modulePath = context.getProperty(MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
super.setup();
scriptToRun = scriptBody;
@ -182,11 +188,11 @@ public class ExecuteScript extends AbstractScriptProcessor {
// Execute any engine-specific configuration before the script is evaluated
ScriptEngineConfigurator configurator =
scriptEngineConfiguratorMap.get(scriptEngineName);
scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.eval(scriptEngine, scriptToRun, modulePath);
configurator.eval(scriptEngine, scriptToRun, modules);
} else {
scriptEngine.eval(scriptToRun);
}

View File

@ -46,7 +46,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -229,7 +228,12 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
modulePath = context.getProperty(MODULES).evaluateAttributeExpressions().getValue();
String modulePath = context.getProperty(MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
setup();
}
@ -294,18 +298,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final ProcessorLog logger = getLogger();
final String message = "Unable to load script: " + e;
// If the module path has not yet been set, then this script is likely being loaded too early and depends
// on modules the processor does not yet know about. If this is the case, it will be reloaded later on
// property change (modules) or when scheduled
if (modulePath != null) {
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.build());
}
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.build());
}
// store the updated validation results
@ -364,9 +363,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final Invocable invocable = (Invocable) scriptEngine;
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName);
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
if (configurator != null) {
configurator.eval(scriptEngine, scriptBody, modulePath);
configurator.eval(scriptEngine, scriptBody, modules);
} else {
// evaluate the script
scriptEngine.eval(scriptBody);
@ -449,19 +448,20 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
// Verify that exactly one of "script file" or "script body" is set
Map<PropertyDescriptor, String> propertyMap = context.getProperties();
if (StringUtils.isEmpty(propertyMap.get(SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(SCRIPT_BODY))) {
Set<ValidationResult> results = new HashSet<>();
results.add(new ValidationResult.Builder().valid(false).explanation(
"Exactly one of Script File or Script Body must be set").build());
return results;
Collection<ValidationResult> commonValidationResults = super.customValidate(context);
if(!commonValidationResults.isEmpty()) {
return commonValidationResults;
}
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
modulePath = context.getProperty(MODULES).evaluateAttributeExpressions().getValue();
String modulePath = context.getProperty(MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
setup();
// Now that InvokeScriptedProcessor is validated, we can call validate on the scripted processor

View File

@ -17,8 +17,11 @@
package org.apache.nifi.processors.script;
import org.apache.nifi.logging.ComponentLog;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.net.URL;
/**
* This interface describes callback methods used by the ExecuteScript/InvokeScript processors to perform
@ -28,8 +31,10 @@ public interface ScriptEngineConfigurator {
String getScriptEngineName();
Object init(ScriptEngine engine, String modulePath) throws ScriptException;
URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log);
Object eval(ScriptEngine engine, String scriptBody, String modulePath) throws ScriptException;
Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException;
Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
/**
* This base class provides a common implementation for the getModuleURLsForClasspath method of the
* ScriptEngineConfigurator interface
*/
public abstract class AbstractModuleClassloaderConfigurator implements ScriptEngineConfigurator {
/**
* Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list
* of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar).
* Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories
* full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each
* JAR's URL.
* @param modulePaths An array of module paths to scan/add
* @param log A logger for the calling component, to provide feedback for missing files, e.g.
* @return An array of URLs corresponding to all modules determined from the input set of module paths.
*/
@Override
public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) {
List<URL> additionalClasspath = new LinkedList<>();
if (modulePaths != null) {
for (String modulePathString : modulePaths) {
File modulePath = new File(modulePathString);
if (modulePath.exists()) {
// Add the URL of this path
try {
additionalClasspath.add(modulePath.toURI().toURL());
} catch (MalformedURLException mue) {
log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
}
// If the path is a directory, we need to scan for JARs and add them to the classpath
if (modulePath.isDirectory()) {
File[] jarFiles = modulePath.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return (name != null && name.endsWith(".jar"));
}
});
if (jarFiles != null) {
// Add each to the classpath
for (File jarFile : jarFiles) {
try {
additionalClasspath.add(jarFile.toURI().toURL());
} catch (MalformedURLException mue) {
log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
}
}
}
}
} else {
log.warn("{} does not exist, ignoring", new Object[]{modulePath.getAbsolutePath()});
}
}
}
return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
}
}

View File

@ -16,12 +16,10 @@
*/
package org.apache.nifi.processors.script.impl;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
public class GroovyScriptEngineConfigurator implements ScriptEngineConfigurator {
public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
private static final String PRELOADS =
"import org.apache.nifi.components.*\n"
@ -41,14 +39,16 @@ public class GroovyScriptEngineConfigurator implements ScriptEngineConfigurator
return "Groovy";
}
@Override
public Object init(ScriptEngine engine, String modulePath) throws ScriptException {
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
scriptEngine = engine;
return scriptEngine;
}
@Override
public Object eval(ScriptEngine engine, String scriptBody, String modulePath) throws ScriptException {
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
scriptEngine = engine;
return engine.eval(PRELOADS + scriptBody);
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
/**
* This class offers methods to perform Javascript-specific operations during the script engine lifecycle.
*/
public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
@Override
public String getScriptEngineName() {
return "ECMAScript";
}
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
// No initialization methods needed at present
return engine;
}
@Override
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
return engine.eval(scriptBody);
}
}

View File

@ -16,33 +16,45 @@
*/
package org.apache.nifi.processors.script.impl;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.net.URL;
/**
* A helper class to configure the Jython engine with any specific requirements
*/
public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator {
@Override
public String getScriptEngineName() {
return "python";
}
@Override
public Object init(ScriptEngine engine, String modulePath) throws ScriptException {
public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) {
// We don't need to add the module paths to the classpath, they will be added via sys.path.append
return new URL[0];
}
@Override
public Object init(ScriptEngine engine, String[] modulePaths) throws ScriptException {
return null;
}
public Object eval(ScriptEngine engine, String scriptBody, String modulePath) throws ScriptException {
@Override
public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
Object returnValue = null;
if (engine != null) {
// Need to import the module path inside the engine, in order to pick up
// other Python/Jython modules
engine.eval("import sys");
if (modulePath != null) {
engine.eval("sys.path.append('" + modulePath + "')");
if (modulePaths != null) {
for (String modulePath : modulePaths) {
engine.eval("sys.path.append('" + modulePath + "')");
}
}
returnValue = engine.eval(scriptBody);
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.processors.script.impl.JythonScriptEngineConfigurator
org.apache.nifi.processors.script.impl.GroovyScriptEngineConfigurator
org.apache.nifi.processors.script.impl.JavascriptScriptEngineConfigurator

View File

@ -103,7 +103,7 @@ public class TestExecuteGroovy extends BaseScriptTest {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger_newFlowFile.groovy");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/groovy");
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));