NIFI-3938: Added ScriptedLookupService, some refactor for reusable scripting classes

This closes #1828.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matt Burgess 2017-05-19 11:01:43 -04:00 committed by Andy LoPresto
parent d4f0c1d048
commit 9294a26139
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
33 changed files with 821 additions and 214 deletions

View File

@ -46,6 +46,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>

View File

@ -0,0 +1,375 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.script;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.script.AbstractScriptedControllerService;
import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
/**
* A Controller service that allows the user to script the lookup operation to be performed (by LookupRecord, e.g.)
*/
@Tags({"lookup", "record", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
@CapabilityDescription("Allows the user to provide a scripted LookupService instance in order to enrich records from an incoming flow file.")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class ScriptedLookupService extends AbstractScriptedControllerService implements LookupService<Object> {
protected final AtomicReference<LookupService<Object>> lookupService = new AtomicReference<>();
private volatile String kerberosServicePrincipal = null;
private volatile File kerberosConfigFile = null;
private volatile File kerberosServiceKeytab = null;
@Override
public Optional<Object> lookup(String key) throws LookupFailureException {
// Delegate the lookup() call to the scripted LookupService
return lookupService.get().lookup(key);
}
@Override
public Class<?> getValueType() {
// Delegate the getValueType() call to the scripted LookupService
return lookupService.get().getValueType();
}
@Override
protected void init(final ControllerServiceInitializationContext context) {
kerberosServicePrincipal = context.getKerberosServicePrincipal();
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosServiceKeytab = context.getKerberosServiceKeytab();
}
/**
* Returns a list of property descriptors supported by this processor. The
* list always includes properties such as script engine name, script file
* name, script body name, script arguments, and an external module path. If
* the scripted processor also defines supported properties, those are added
* to the list as well.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
final ConfigurableComponent instance = lookupService.get();
if (instance != null) {
try {
final List<PropertyDescriptor> instanceDescriptors = instance.getPropertyDescriptors();
if (instanceDescriptors != null) {
supportedPropertyDescriptors.addAll(instanceDescriptors);
}
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get property descriptors from Processor: " + t;
logger.error(message);
if (logger.isDebugEnabled()) {
logger.error(message, t);
}
}
}
return Collections.unmodifiableList(supportedPropertyDescriptors);
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to
* be able to define their own properties which will be available as
* variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors
* exist for that name
* @return a PropertyDescriptor object corresponding to the specified
* dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
final ComponentLog logger = getLogger();
final ConfigurableComponent instance = lookupService.get();
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptEngine = null;
}
} else if (instance != null) {
// If the script provides a ConfigurableComponent, call its onPropertyModified() method
try {
instance.onPropertyModified(descriptor, oldValue, newValue);
} catch (final Exception e) {
final String message = "Unable to invoke onPropertyModified from scripted LookupService: " + e;
logger.error(message, e);
}
}
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary
final Invocable invocable = (Invocable) scriptEngine;
if (configurationContext != null) {
try {
// Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
// where lookupService is a proxied interface
final Object obj = scriptEngine.get("lookupService");
if (obj != null) {
try {
invocable.invokeMethod(obj, "onEnabled", context);
} catch (final NoSuchMethodException nsme) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Configured script LookupService does not contain an onEnabled() method.");
}
}
} else {
throw new ScriptException("No LookupService was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onEnabled(context) method", se);
}
}
}
@OnDisabled
public void onDisabled(final ConfigurationContext context) {
// Call an non-interface method onDisabled(context), to allow a scripted LookupService the chance to shut down as necessary
final Invocable invocable = (Invocable) scriptEngine;
if (configurationContext != null) {
try {
// Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
// where lookupService is a proxied interface
final Object obj = scriptEngine.get("lookupService");
if (obj != null) {
try {
invocable.invokeMethod(obj, "onDisabled", context);
} catch (final NoSuchMethodException nsme) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Configured script LookupService does not contain an onDisabled() method.");
}
}
} else {
throw new ScriptException("No LookupService was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onDisabled(context) method", se);
}
}
}
public void setup() {
// Create a single script engine, the Processor object is reused by each task
if (scriptEngine == null) {
scriptingComponentHelper.setup(1, getLogger());
scriptEngine = scriptingComponentHelper.engineQ.poll();
}
if (scriptEngine == null) {
throw new ProcessException("No script engine available!");
}
if (scriptNeedsReload.get() || lookupService.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
} else {
reloadScriptBody(scriptingComponentHelper.getScriptBody());
}
scriptNeedsReload.set(false);
}
}
/**
* Reloads the script RecordReaderFactory. This must be called within the lock.
*
* @param scriptBody An input stream associated with the script content
* @return Whether the script was successfully reloaded
*/
protected boolean reloadScript(final String scriptBody) {
// note we are starting here with a fresh listing of validation
// results since we are (re)loading a new/updated script. any
// existing validation results are not relevant
final Collection<ValidationResult> results = new HashSet<>();
try {
// get the engine and ensure its invocable
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script
scriptEngine.eval(scriptBody);
}
// get configured LookupService from the script (if it exists)
final Object obj = scriptEngine.get("lookupService");
if (obj != null) {
final ComponentLog logger = getLogger();
try {
// set the logger if the processor wants it
invocable.invokeMethod(obj, "setLogger", logger);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Scripted LookupService does not contain a setLogger method.");
}
}
// record the processor for use later
final LookupService<Object> scriptedLookupService = invocable.getInterface(obj, LookupService.class);
lookupService.set(scriptedLookupService);
if (scriptedLookupService != null) {
try {
scriptedLookupService.initialize(new ControllerServiceInitializationContext() {
@Override
public String getIdentifier() {
return ScriptedLookupService.this.getIdentifier();
}
@Override
public ComponentLog getLogger() {
return logger;
}
@Override
public StateManager getStateManager() {
return ScriptedLookupService.this.getStateManager();
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return ScriptedLookupService.super.getControllerServiceLookup();
}
@Override
public String getKerberosServicePrincipal() {
return ScriptedLookupService.this.kerberosServicePrincipal;
}
@Override
public File getKerberosServiceKeytab() {
return ScriptedLookupService.this.kerberosServiceKeytab;
}
@Override
public File getKerberosConfigurationFile() {
return ScriptedLookupService.this.kerberosConfigFile;
}
});
} catch (final Exception e) {
logger.error("Unable to initialize scripted LookupService: " + e.getLocalizedMessage(), e);
throw new ProcessException(e);
}
}
} else {
throw new ScriptException("No LookupService was defined by the script.");
}
} else {
throw new ScriptException("Script engine is not Invocable, cannot be used for ScriptedLookupService");
}
} catch (final Exception ex) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + ex.getLocalizedMessage();
logger.error(message, ex);
results.add(new ValidationResult.Builder()
.subject("ScriptedLookupServiceValidation")
.valid(false)
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
}

View File

@ -38,6 +38,8 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import java.nio.charset.Charset;
import javax.script.Bindings;

View File

@ -41,6 +41,8 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import javax.script.Invocable;
import javax.script.ScriptEngine;

View File

@ -16,130 +16,19 @@
*/
package org.apache.nifi.record.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptingComponentHelper;
import org.apache.nifi.processors.script.ScriptingComponentUtils;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.AbstractScriptedControllerService;
import javax.script.ScriptEngine;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* An abstract base class containing code common to the Scripted record reader/writer implementations
*/
public abstract class AbstractScriptedRecordFactory<T> extends AbstractControllerService {
public abstract class AbstractScriptedRecordFactory<T> extends AbstractScriptedControllerService {
protected final AtomicReference<T> recordFactory = new AtomicReference<>();
protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
protected volatile ScriptEngine scriptEngine = null;
protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
protected volatile ConfigurationContext configurationContext = null;
/**
* Returns a list of property descriptors supported by this record reader. The
* list always includes properties such as script engine name, script file
* name, script body name, script arguments, and an external module path.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
return Collections.unmodifiableList(supportedPropertyDescriptors);
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to
* be able to define their own properties which will be available as
* variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors
* exist for that name
* @return a PropertyDescriptor object corresponding to the specified
* dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptEngine = null;
}
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
public void onEnabled(final ConfigurationContext context) {
this.configurationContext = context;
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
scriptingComponentHelper.setModules(modulePath.split(","));
} else {
scriptingComponentHelper.setModules(new String[0]);
}
setup();
}
public void setup() {
// Create a single script engine, the Processor object is reused by each task
if (scriptEngine == null) {
@ -160,69 +49,4 @@ public abstract class AbstractScriptedRecordFactory<T> extends AbstractControlle
scriptNeedsReload.set(false);
}
}
/**
* Reloads the script located at the given path
*
* @param scriptPath the path to the script file to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
private boolean reloadScriptFile(final String scriptPath) {
final Collection<ValidationResult> results = new HashSet<>();
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset()));
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
/**
* Reloads the script defined by the given string
*
* @param scriptBody the contents of the script to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
private boolean reloadScriptBody(final String scriptBody) {
final Collection<ValidationResult> results = new HashSet<>();
try {
return reloadScript(scriptBody);
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
protected abstract boolean reloadScript(final String scriptBody);
}

View File

@ -31,11 +31,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.processors.script.ScriptingComponentHelper;
import org.apache.nifi.processors.script.ScriptingComponentUtils;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.util.StringUtils;
import javax.script.Bindings;
import javax.script.ScriptContext;
@ -119,15 +117,8 @@ public class ScriptedReportingTask extends AbstractReportingTask {
*/
@OnScheduled
public void setup(final ConfigurationContext context) {
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
scriptingComponentHelper.setModules(modulePath.split(","));
} else {
scriptingComponentHelper.setModules(new String[0]);
}
scriptingComponentHelper.setupVariables(context);
// Create a script engine for each possible task
scriptingComponentHelper.setup(1, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import javax.script.ScriptEngine;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* An abstract class with common methods and variables for reuse among Controller Services
*/
public abstract class AbstractScriptedControllerService extends AbstractControllerService {
protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
protected volatile ScriptEngine scriptEngine = null;
protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
protected volatile ConfigurationContext configurationContext = null;
/**
* Returns a list of property descriptors supported by this record reader. The
* list always includes properties such as script engine name, script file
* name, script body name, script arguments, and an external module path.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
return Collections.unmodifiableList(supportedPropertyDescriptors);
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to
* be able to define their own properties which will be available as
* variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors
* exist for that name
* @return a PropertyDescriptor object corresponding to the specified
* dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptEngine = null;
}
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
public void onEnabled(final ConfigurationContext context) {
this.configurationContext = context;
scriptingComponentHelper.setupVariables(context);
setup();
}
abstract public void setup();
/**
* Reloads the script located at the given path
*
* @param scriptPath the path to the script file to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
protected boolean reloadScriptFile(final String scriptPath) {
final Collection<ValidationResult> results = new HashSet<>();
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset()));
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
/**
* Reloads the script defined by the given string
*
* @param scriptBody the contents of the script to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
protected boolean reloadScriptBody(final String scriptBody) {
final Collection<ValidationResult> results = new HashSet<>();
try {
return reloadScript(scriptBody);
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
protected abstract boolean reloadScript(final String scriptBody);
}

View File

@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
package org.apache.nifi.script;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import java.io.File;
@ -48,6 +49,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.util.StringUtils;
/**
@ -281,7 +283,19 @@ public class ScriptingComponentHelper {
}
}
void setupVariables(ProcessContext context) {
public void setupVariables(ProcessContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
}
public void setupVariables(ConfigurationContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
@ -310,7 +324,7 @@ public class ScriptingComponentHelper {
return factory.getScriptEngine();
}
void stop() {
public void stop() {
if (engineQ != null) {
engineQ.clear();
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
package org.apache.nifi.script;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
package org.apache.nifi.script.impl;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
package org.apache.nifi.script.impl;
import org.apache.nifi.processors.script.engine.ClojureScriptEngine;
@ -37,8 +37,10 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo
+ "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n"
+ "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n"
+ "[org.apache.nifi.processor.util FlowFileFilters StandardValidators]\n"
+ "[org.apache.nifi.processors.script ScriptingComponentHelper ScriptingComponentUtils ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n"
+ "[org.apache.nifi.processors.script ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n"
+ "[org.apache.nifi.script ScriptingComponentHelper ScriptingComponentUtils]\n"
+ "[org.apache.nifi.logging ComponentLog]\n"
+ "[org.apache.nifi.lookup LookupService RecordLookupService StringLookupService LookupFailureException]\n"
+ ")\n";

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
package org.apache.nifi.script.impl;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
@ -29,8 +29,9 @@ public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderCon
+ "import org.apache.nifi.processor.io.*\n"
+ "import org.apache.nifi.processor.util.*\n"
+ "import org.apache.nifi.processors.script.*\n"
+ "import org.apache.nifi.logging.ComponentLog\n";
+ "import org.apache.nifi.logging.ComponentLog\n"
+ "import org.apache.nifi.script.*\n"
+ "import org.apache.nifi.lookup.*\n";
private ScriptEngine scriptEngine;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
package org.apache.nifi.script.impl;
import javax.script.ScriptEngine;
import javax.script.ScriptException;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script.impl;
package org.apache.nifi.script.impl;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;

View File

@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.record.script.ScriptedReader
org.apache.nifi.record.script.ScriptedRecordSetWriter
org.apache.nifi.record.script.ScriptedRecordSetWriter
org.apache.nifi.lookup.script.ScriptedLookupService

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.script.impl.ClojureScriptEngineConfigurator
org.apache.nifi.processors.script.impl.JythonScriptEngineConfigurator
org.apache.nifi.processors.script.impl.GroovyScriptEngineConfigurator
org.apache.nifi.processors.script.impl.JavascriptScriptEngineConfigurator
org.apache.nifi.script.impl.ClojureScriptEngineConfigurator
org.apache.nifi.script.impl.JythonScriptEngineConfigurator
org.apache.nifi.script.impl.GroovyScriptEngineConfigurator
org.apache.nifi.script.impl.JavascriptScriptEngineConfigurator

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.script
import org.apache.commons.io.FileUtils
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static junit.framework.TestCase.assertEquals
import static org.junit.Assert.assertFalse
import static org.junit.Assert.assertTrue
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
/**
* Unit tests for the ScriptedLookupService controller service
*/
class TestScriptedLookupService {
private static final Logger logger = LoggerFactory.getLogger(TestScriptedLookupService)
ScriptedLookupService scriptedLookupService
def scriptingComponent
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = {String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
}
@Before
void setUp() {
scriptedLookupService = new MockScriptedLookupService()
scriptingComponent = (AccessibleScriptingComponentHelper) scriptedLookupService
}
@Test
void testLookupServiceGroovyScript() {
def properties = [:] as Map<PropertyDescriptor, String>
scriptedLookupService.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_lookup_inline.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
def logger = mock(ComponentLog)
def initContext = mock(ControllerServiceInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
scriptedLookupService.initialize initContext
scriptedLookupService.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
Optional opt = scriptedLookupService.lookup('Hello')
assertTrue(opt.present)
assertEquals('Hi', opt.get())
opt = scriptedLookupService.lookup('World')
assertTrue(opt.present)
assertEquals('there', opt.get())
opt = scriptedLookupService.lookup('Not There')
assertFalse(opt.present)
}
class MockScriptedLookupService extends ScriptedLookupService implements AccessibleScriptingComponentHelper {
@Override
ScriptingComponentHelper getScriptingComponentHelper() {
return this.@scriptingComponentHelper
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.StopWatch
import org.apache.nifi.util.TestRunners

View File

@ -23,8 +23,8 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockFlowFile

View File

@ -22,8 +22,8 @@ import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord

View File

@ -22,8 +22,8 @@ import org.apache.nifi.components.PropertyValue
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.provenance.ProvenanceEventBuilder
import org.apache.nifi.provenance.ProvenanceEventRecord
import org.apache.nifi.provenance.ProvenanceEventRepository

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentHelper;
/**
* An interface for retrieving the scripting component helper for a scripting processor. Aids in testing (for setting the Script Engine descriptor, for example).
*/

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.script;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.script;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.script;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.script;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
class GroovyLookupService implements LookupService<String> {
def lookupTable = [
'Hello': 'Hi',
'World': 'there'
]
@Override
Optional<String> lookup(String key) {
Optional.ofNullable(lookupTable[key])
}
@Override
Class<?> getValueType() {
return String
}
@Override
void initialize(ControllerServiceInitializationContext context) throws InitializationException {
}
@Override
Collection<ValidationResult> validate(ValidationContext context) {
return null
}
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
return null
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return null
}
@Override
String getIdentifier() {
return null
}
}
lookupService = new GroovyLookupService()