NIFI-1458: Added ScriptedReportingTask

This closes #1045.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matt Burgess 2016-10-14 13:41:37 -04:00 committed by Andy LoPresto
parent 31ec01b5f9
commit 675f4f544c
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
22 changed files with 885 additions and 249 deletions

View File

@ -33,6 +33,20 @@ The following binary components are provided under the Apache Software License v
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Yammer Metrics
The following NOTICE information applies:
Metrics
Copyright 2010-2012 Coda Hale and Yammer, Inc.
This product includes software developed by Coda Hale and Yammer, Inc.
This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
with the following comments:
Written by Doug Lea with assistance from members of JCP JSR-166
Expert Group and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/
******************
Eclipse Public License v1.0
******************

View File

@ -65,6 +65,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -16,22 +16,30 @@
*/
package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.nio.charset.Charset;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
@ -39,6 +47,7 @@ import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -57,9 +66,18 @@ import java.util.Set;
description = "Updates a script engine property specified by the Dynamic Property's key with the value "
+ "specified by the Dynamic Property's value")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class ExecuteScript extends AbstractScriptProcessor {
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
@SeeAlso({InvokeScriptedProcessor.class})
public class ExecuteScript extends AbstractSessionFactoryProcessor {
// Constants maintained for backwards compatibility
public static final Relationship REL_SUCCESS = ScriptingComponentUtils.REL_SUCCESS;
public static final Relationship REL_FAILURE = ScriptingComponentUtils.REL_FAILURE;
private String scriptToRun = null;
volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
/**
* Returns the valid relationships for this processor.
@ -83,13 +101,13 @@ public class ExecuteScript extends AbstractScriptProcessor {
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (isInitialized) {
if (!isInitialized.get()) {
createResources();
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
return Collections.unmodifiableList(descriptors);
return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors());
}
/**
@ -110,6 +128,10 @@ public class ExecuteScript extends AbstractScriptProcessor {
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
@ -119,30 +141,22 @@ public class ExecuteScript extends AbstractScriptProcessor {
*/
@OnScheduled
public void setup(final ProcessContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
String modulePath = context.getProperty(MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
scriptingComponentHelper.setupVariables(context);
// Create a script engine for each possible task
int maxTasks = context.getMaxConcurrentTasks();
super.setup(maxTasks);
scriptToRun = scriptBody;
scriptingComponentHelper.setup(maxTasks, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();
try {
if (scriptToRun == null && scriptPath != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
scriptToRun = IOUtils.toString(scriptStream);
if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptingComponentHelper.getScriptPath())) {
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
}
/**
@ -158,12 +172,12 @@ public class ExecuteScript extends AbstractScriptProcessor {
*/
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
synchronized (isInitialized) {
if (!isInitialized.get()) {
createResources();
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
ScriptEngine scriptEngine = engineQ.poll();
ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
ComponentLog log = getLogger();
if (scriptEngine == null) {
// No engine available so nothing more to do here
@ -197,11 +211,11 @@ public class ExecuteScript extends AbstractScriptProcessor {
// Execute any engine-specific configuration before the script is evaluated
ScriptEngineConfigurator configurator =
scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.eval(scriptEngine, scriptToRun, modules);
configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
} else {
scriptEngine.eval(scriptToRun);
}
@ -219,7 +233,12 @@ public class ExecuteScript extends AbstractScriptProcessor {
session.rollback(true);
throw t;
} finally {
engineQ.offer(scriptEngine);
scriptingComponentHelper.engineQ.offer(scriptEngine);
}
}
@OnStopped
public void stop() {
scriptingComponentHelper.stop();
}
}

View File

@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -28,9 +29,11 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
@ -44,6 +47,7 @@ import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -61,9 +65,11 @@ import java.util.concurrent.atomic.AtomicReference;
+ "Experimental: Impact of sustained usage not yet verified.")
@DynamicProperty(name = "A script engine property to update", value = "The value to set it to", supportsExpressionLanguage = true,
description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
@SeeAlso({ExecuteScript.class})
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class InvokeScriptedProcessor extends AbstractScriptProcessor {
public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
private final AtomicReference<Processor> processor = new AtomicReference<>();
private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
@ -74,6 +80,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private volatile String kerberosServicePrincipal = null;
private volatile File kerberosConfigFile = null;
private volatile File kerberosServiceKeytab = null;
volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
/**
* Returns the valid relationships for this processor as supplied by the
@ -123,13 +130,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (isInitialized) {
if (!isInitialized.get()) {
createResources();
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
supportedPropertyDescriptors.addAll(descriptors);
supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
final Processor instance = processor.get();
if (instance != null) {
@ -182,23 +189,15 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
*/
@OnScheduled
public void setup(final ProcessContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
String modulePath = context.getProperty(MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
scriptingComponentHelper.setupVariables(context);
setup();
}
public void setup() {
// Create a single script engine, the Processor object is reused by each task
if(scriptEngine == null) {
super.setup(1);
scriptEngine = engineQ.poll();
scriptingComponentHelper.setup(1, getLogger());
scriptEngine = scriptingComponentHelper.engineQ.poll();
}
if (scriptEngine == null) {
@ -206,10 +205,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
}
if (scriptNeedsReload.get() || processor.get() == null) {
if (isFile(scriptPath)) {
reloadScriptFile(scriptPath);
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
} else {
reloadScriptBody(scriptBody);
reloadScriptBody(scriptingComponentHelper.getScriptBody());
}
scriptNeedsReload.set(false);
}
@ -228,13 +227,13 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final ComponentLog logger = getLogger();
final Processor instance = processor.get();
if (SCRIPT_FILE.equals(descriptor)
|| SCRIPT_BODY.equals(descriptor)
|| MODULES.equals(descriptor)
|| SCRIPT_ENGINE.equals(descriptor)) {
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (SCRIPT_ENGINE.equals(descriptor)) {
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptEngine = null;
}
} else if (instance != null) {
@ -258,7 +257,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final Collection<ValidationResult> results = new HashSet<>();
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
return reloadScript(IOUtils.toString(scriptStream));
return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset()));
} catch (final Exception e) {
final ComponentLog logger = getLogger();
@ -300,7 +299,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.input(scriptingComponentHelper.getScriptPath())
.build());
}
@ -329,9 +328,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final Invocable invocable = (Invocable) scriptEngine;
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.eval(scriptEngine, scriptBody, modules);
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script
scriptEngine.eval(scriptBody);
@ -412,7 +411,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
.input(scriptPath)
.input(scriptingComponentHelper.getScriptPath())
.build());
}
@ -442,14 +441,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
return commonValidationResults;
}
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(SCRIPT_BODY).getValue();
String modulePath = context.getProperty(MODULES).getValue();
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
scriptingComponentHelper.setModules(modulePath.split(","));
} else {
modules = new String[0];
scriptingComponentHelper.setModules(new String[0]);
}
setup();
@ -477,7 +476,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
.subject("Validation")
.valid(false)
.explanation("An error occurred calling validate in the configured script Processor.")
.input(context.getProperty(SCRIPT_FILE).getValue())
.input(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue())
.build());
return results;
}
@ -505,9 +504,9 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// Initialize the rest of the processor resources if we have not already done so
synchronized (isInitialized) {
if (!isInitialized.get()) {
super.createResources();
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
@ -529,7 +528,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
// run the processor
instance.onTrigger(context, sessionFactory);
} catch (final ProcessException e) {
final String message = String.format("An error occurred executing the configured Processor [%s]: %s", context.getProperty(SCRIPT_FILE).getValue(), e);
final String message = String.format("An error occurred executing the configured Processor [%s]: %s",
context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue(), e);
log.error(message);
throw e;
}
@ -539,9 +539,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
}
@OnStopped
@Override
public void stop() {
super.stop();
scriptingComponentHelper.stop();
processor.set(null);
scriptEngine = null;
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import java.io.File;
@ -29,7 +27,6 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -46,72 +43,72 @@ import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.StringUtils;
/**
* This class contains variables and methods common to scripting processors
* This class contains variables and methods common to scripting processors, reporting tasks, etc.
*/
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
description = "Scripts can store and retrieve state using the State Management APIs. Consult the State Manager section of the Developer's Guide for more details.")
public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProcessor {
public class ScriptingComponentHelper {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that were successfully processed")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to be processed")
.build();
public static PropertyDescriptor SCRIPT_ENGINE;
public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder()
.name("Script File")
.required(false)
.description("Path to script file to execute. Only one of Script File or Script Body may be used")
.addValidator(new StandardValidators.FileExistsValidator(true))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder()
.name("Script Body")
.required(false)
.description("Body of script to execute. Only one of Script File or Script Body may be used")
.addValidator(Validator.VALID)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
.name("Module Directory")
.description("Comma-separated list of paths to files and/or directories which contain modules required by the script.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public PropertyDescriptor SCRIPT_ENGINE;
// A map from engine name to a custom configurator for that engine
protected final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>();
protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
public final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>();
public final AtomicBoolean isInitialized = new AtomicBoolean(false);
protected Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
protected String scriptEngineName;
protected String scriptPath;
protected String scriptBody;
protected String[] modules;
protected List<PropertyDescriptor> descriptors;
public Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
private String scriptEngineName;
private String scriptPath;
private String scriptBody;
private String[] modules;
private List<PropertyDescriptor> descriptors;
protected BlockingQueue<ScriptEngine> engineQ = null;
public BlockingQueue<ScriptEngine> engineQ = null;
public String getScriptEngineName() {
return scriptEngineName;
}
public void setScriptEngineName(String scriptEngineName) {
this.scriptEngineName = scriptEngineName;
}
public String getScriptPath() {
return scriptPath;
}
public void setScriptPath(String scriptPath) {
this.scriptPath = scriptPath;
}
public String getScriptBody() {
return scriptBody;
}
public void setScriptBody(String scriptBody) {
this.scriptBody = scriptBody;
}
public String[] getModules() {
return modules;
}
public void setModules(String[] modules) {
this.modules = modules;
}
public List<PropertyDescriptor> getDescriptors() {
return descriptors;
}
public void setDescriptors(List<PropertyDescriptor> descriptors) {
this.descriptors = descriptors;
}
/**
* Custom validation for ensuring exactly one of Script File or Script Body is populated
@ -121,13 +118,12 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* for operating on those values
* @return A collection of validation results
*/
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
public Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Set<ValidationResult> results = new HashSet<>();
// Verify that exactly one of "script file" or "script body" is set
Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
if (StringUtils.isEmpty(propertyMap.get(SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(SCRIPT_BODY))) {
if (StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_FILE)) == StringUtils.isEmpty(propertyMap.get(ScriptingComponentUtils.SCRIPT_BODY))) {
results.add(new ValidationResult.Builder().valid(false).explanation(
"Exactly one of Script File or Script Body must be set").build());
}
@ -139,7 +135,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* This method creates all resources needed for the script processor to function, such as script engines,
* script file reloader threads, etc.
*/
protected void createResources() {
public void createResources() {
descriptors = new ArrayList<>();
// The following is required for JRuby, should be transparent to everything else.
// Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The
@ -158,9 +154,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
}
// Sort the list by name so the list always looks the same.
Collections.sort(engineList, new Comparator<AllowableValue>() {
@Override
public int compare(AllowableValue o1, AllowableValue o2) {
Collections.sort(engineList, (o1, o2) -> {
if (o1 == null) {
return o2 == null ? 0 : 1;
}
@ -168,7 +162,6 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
return -1;
}
return o1.getValue().compareTo(o2.getValue());
}
});
AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]);
@ -185,9 +178,9 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
descriptors.add(SCRIPT_ENGINE);
}
descriptors.add(SCRIPT_FILE);
descriptors.add(SCRIPT_BODY);
descriptors.add(MODULES);
descriptors.add(ScriptingComponentUtils.SCRIPT_FILE);
descriptors.add(ScriptingComponentUtils.SCRIPT_BODY);
descriptors.add(ScriptingComponentUtils.MODULES);
isInitialized.set(true);
}
@ -198,7 +191,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* @param path a path to a file
* @return true if the path refers to a valid file, false otherwise
*/
protected boolean isFile(final String path) {
public static boolean isFile(final String path) {
return path != null && Files.isRegularFile(Paths.get(path));
}
@ -208,7 +201,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
*
* @param numberOfScriptEngines number of engines to setup
*/
public void setup(int numberOfScriptEngines) {
public void setup(int numberOfScriptEngines, ComponentLog log) {
if (scriptEngineConfiguratorMap.isEmpty()) {
ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
@ -217,7 +210,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator);
}
}
setupEngines(numberOfScriptEngines);
setupEngines(numberOfScriptEngines, log);
}
/**
@ -228,12 +221,10 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* @param numberOfScriptEngines number of engines to setup
* @see org.apache.nifi.processors.script.ScriptEngineConfigurator
*/
protected void setupEngines(int numberOfScriptEngines) {
protected void setupEngines(int numberOfScriptEngines, ComponentLog log) {
engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines);
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
ComponentLog log = getLogger();
if (StringUtils.isBlank(scriptEngineName)) {
throw new IllegalArgumentException("The script engine name cannot be null");
}
@ -290,6 +281,18 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
}
}
void setupVariables(ProcessContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
}
/**
* Provides a ScriptEngine corresponding to the currently selected script engine name.
* ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which
@ -307,8 +310,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
return factory.getScriptEngine();
}
@OnStopped
public void stop() {
void stop() {
if (engineQ != null) {
engineQ.clear();
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
/**
* Utility methods and constants used by the scripting components.
*/
public class ScriptingComponentUtils {
/** A relationship indicating flow files were processed successfully */
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that were successfully processed")
.build();
/** A relationship indicating an error while processing flow files */
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to be processed")
.build();
/** A property descriptor for specifying the location of a script file */
public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder()
.name("Script File")
.required(false)
.description("Path to script file to execute. Only one of Script File or Script Body may be used")
.addValidator(new StandardValidators.FileExistsValidator(true))
.expressionLanguageSupported(true)
.build();
/** A property descriptor for specifying the body of a script */
public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder()
.name("Script Body")
.required(false)
.description("Body of script to execute. Only one of Script File or Script Body may be used")
.addValidator(Validator.VALID)
.expressionLanguageSupported(false)
.build();
/** A property descriptor for specifying the location of additional modules to be used by the script */
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
.name("Module Directory")
.description("Comma-separated list of paths to files and/or directories which contain modules required by the script.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.script;
import com.yammer.metrics.core.VirtualMachineMetrics;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.processors.script.ScriptingComponentHelper;
import org.apache.nifi.processors.script.ScriptingComponentUtils;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.util.StringUtils;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A Reporting task whose body is provided by a script (via supported JSR-223 script engines)
*/
@Tags({"reporting", "script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
@CapabilityDescription("Provides reporting and status information to a script. ReportingContext, ComponentLog, and VirtualMachineMetrics objects are made available "
+ "as variables (context, log, and vmMetrics, respectively) to the script for further processing. The context makes various information available such "
+ "as events, provenance, bulletins, controller services, process groups, Java Virtual Machine metrics, etc.")
@DynamicProperty(
name = "A script engine property to update",
value = "The value to set it to",
supportsExpressionLanguage = true,
description = "Updates a script engine property specified by the Dynamic Property's key with the value "
+ "specified by the Dynamic Property's value")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class ScriptedReportingTask extends AbstractReportingTask {
protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
private volatile String scriptToRun = null;
private volatile VirtualMachineMetrics vmMetrics;
/**
* Returns a list of property descriptors supported by this processor. The list always includes properties such as
* script engine name, script file name, script body name, script arguments, and an external module path. If the
* scripted processor also defines supported properties, those are added to the list as well.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors());
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties
* which will be available as variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors exist for that name
* @return a PropertyDescriptor object corresponding to the specified dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
* properties, as well as reloading the script (from file or the "Script Body" property)
*
* @param context the context in which to perform the setup operations
*/
@OnScheduled
public void setup(final ConfigurationContext context) {
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).getValue();
if (!StringUtils.isEmpty(modulePath)) {
scriptingComponentHelper.setModules(modulePath.split(","));
} else {
scriptingComponentHelper.setModules(new String[0]);
}
// Create a script engine for each possible task
scriptingComponentHelper.setup(1, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();
try {
String scriptPath = scriptingComponentHelper.getScriptPath();
if (scriptToRun == null && scriptPath != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
vmMetrics = VirtualMachineMetrics.getInstance();
}
@Override
public void onTrigger(final ReportingContext context) {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
ComponentLog log = getLogger();
if (scriptEngine == null) {
// No engine available so nothing more to do here
return;
}
try {
try {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
if (bindings == null) {
bindings = new SimpleBindings();
}
bindings.put("context", context);
bindings.put("log", log);
bindings.put("vmMetrics", vmMetrics);
// Find the user-added properties and set them on the script
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
if (property.getKey().isDynamic()) {
// Add the dynamic property bound to its full PropertyValue to the script engine
if (property.getValue() != null) {
bindings.put(property.getKey().getName(), context.getProperty(property.getKey()));
}
}
}
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
// Execute any engine-specific configuration before the script is evaluated
ScriptEngineConfigurator configurator =
scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
// Evaluate the script with the configurator (if it exists) or the engine
if (configurator != null) {
configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
} else {
scriptEngine.eval(scriptToRun);
}
} catch (ScriptException e) {
throw new ProcessException(e);
}
} catch (final Throwable t) {
// Mimic AbstractProcessor behavior here
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
throw t;
} finally {
scriptingComponentHelper.engineQ.offer(scriptEngine);
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.script.ScriptedReportingTask

View File

@ -49,9 +49,9 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
super.setupExecuteScript()
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy")
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy")
}
@After
@ -65,9 +65,9 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
assertNotNull(executeScript.getSupportedPropertyDescriptors())
runner = TestRunners.newTestRunner(executeScript)
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy")
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy")
// Override userContext value
runner.processContext.maxConcurrentTasks = poolSize

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.script
import org.apache.commons.io.FileUtils
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.components.PropertyValue
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.provenance.ProvenanceEventBuilder
import org.apache.nifi.provenance.ProvenanceEventRecord
import org.apache.nifi.provenance.ProvenanceEventRepository
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.provenance.StandardProvenanceEventRecord
import org.apache.nifi.reporting.EventAccess
import org.apache.nifi.reporting.ReportingContext
import org.apache.nifi.reporting.ReportingInitializationContext
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mockito.Mockito
import org.mockito.stubbing.Answer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertTrue
import static org.mockito.Mockito.any
import static org.mockito.Mockito.doAnswer
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
/**
* Unit tests for ScriptedReportingTask.
*/
@RunWith(JUnit4.class)
class ScriptedReportingTaskGroovyTest {
private static final Logger logger = LoggerFactory.getLogger(ScriptedReportingTaskGroovyTest)
def task
def runner
def scriptingComponent
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
}
@Before
void setUp() {
task = new MockScriptedReportingTask()
runner = TestRunners
scriptingComponent = (AccessibleScriptingComponentHelper) task
}
@Test
void testProvenanceGroovyScript() {
def uuid = "10000000-0000-0000-0000-000000000000"
def attributes = ['abc': 'xyz', 'xyz': 'abc', 'filename': "file-$uuid", 'uuid': uuid]
def prevAttrs = ['filename': '1234.xyz']
def flowFile = new MockFlowFile(3L)
flowFile.putAttributes(attributes)
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.eventTime = System.currentTimeMillis()
builder.eventType = ProvenanceEventType.RECEIVE
builder.transitUri = 'nifi://unit-test'
builder.setAttributes(prevAttrs, attributes)
builder.componentId = '1234'
builder.componentType = 'dummy processor'
builder.fromFlowFile(flowFile)
final ProvenanceEventRecord event = builder.build()
def properties = task.supportedPropertyDescriptors.collectEntries { descriptor ->
[descriptor: descriptor.getDefaultValue()]
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_log_provenance_events.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
// Set up ReportingContext
def context = mock(ReportingContext)
when(context.getStateManager()).thenReturn(new MockStateManager(task))
doAnswer({ invocation ->
def descriptor = invocation.getArgumentAt(0, PropertyDescriptor)
return new MockPropertyValue(properties[descriptor])
} as Answer<PropertyValue>
).when(context).getProperty(any(PropertyDescriptor))
def eventAccess = mock(EventAccess)
// Return 3 events for the test
doAnswer({ invocation -> return [event, event, event] } as Answer<List<ProvenanceEventRecord>>
).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt())
def provenanceRepository = mock(ProvenanceEventRepository.class)
doAnswer({ invocation -> return 3 } as Answer<Long>
).when(provenanceRepository).getMaxEventId()
when(context.getEventAccess()).thenReturn(eventAccess);
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository)
def logger = mock(ComponentLog)
def initContext = mock(ReportingInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
task.initialize initContext
task.setup configurationContext
task.onTrigger context
// This script should return a variable x with the number of events and a variable e with the first event
def se = task.scriptEngine
assertEquals 3, se.x
assertEquals '1234', se.e.componentId
assertEquals 'xyz', se.e.attributes.abc
task.offerScriptEngine(se)
}
@Test
void testVMEventsGroovyScript() {
def properties = [:] as Map<PropertyDescriptor, String>
task.getSupportedPropertyDescriptors().each { PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_log_vm_stats.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
// Set up ReportingContext
def context = mock(ReportingContext)
when(context.getStateManager()).thenReturn(new MockStateManager(task))
doAnswer({ invocation ->
PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor)
return new MockPropertyValue(properties[descriptor])
} as Answer<PropertyValue>
).when(context).getProperty(any(PropertyDescriptor))
def logger = mock(ComponentLog)
def initContext = mock(ReportingInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
task.initialize initContext
task.setup configurationContext
task.onTrigger context
def se = task.scriptEngine
// This script should store a variable called x with a map of stats to values
assertTrue se.x?.uptime > 0
task.offerScriptEngine(se)
}
class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper {
def getScriptEngine() {
return scriptingComponentHelper.engineQ.poll()
}
def offerScriptEngine(engine) {
scriptingComponentHelper.engineQ.offer(engine)
}
@Override
ScriptingComponentHelper getScriptingComponentHelper() {
return this.@scriptingComponentHelper
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
/**
* An interface for retrieving the scripting component helper for a scripting processor. Aids in testing (for setting the Script Engine descriptor, for example).
*/
public interface AccessibleScriptingComponentHelper {
ScriptingComponentHelper getScriptingComponentHelper();
}

View File

@ -35,8 +35,8 @@ public abstract class BaseScriptTest {
public final String TEST_RESOURCE_LOCATION = "target/test/resources/";
protected TestRunner runner;
protected AccessibleScriptingComponentHelper scriptingComponent;
/**
* Copies all scripts to the target directory because when they are compiled they can leave unwanted .class files.
@ -49,17 +49,19 @@ public abstract class BaseScriptTest {
}
public void setupExecuteScript() throws Exception {
final ExecuteScript executeScript = new ExecuteScript();
final ExecuteScript executeScript = new AccessibleExecuteScript();
// Need to do something to initialize the properties, like retrieve the list of properties
assertNotNull(executeScript.getSupportedPropertyDescriptors());
runner = TestRunners.newTestRunner(executeScript);
scriptingComponent = (AccessibleScriptingComponentHelper) executeScript;
}
public void setupInvokeScriptProcessor() throws Exception {
final InvokeScriptedProcessor invokeScriptedProcessor = new InvokeScriptedProcessor();
final InvokeScriptedProcessor invokeScriptedProcessor = new AccessibleInvokeScriptedProcessor();
// Need to do something to initialize the properties, like retrieve the list of properties
assertNotNull(invokeScriptedProcessor.getSupportedPropertyDescriptors());
runner = TestRunners.newTestRunner(invokeScriptedProcessor);
scriptingComponent = (AccessibleScriptingComponentHelper) invokeScriptedProcessor;
}
public String getFileContentsAsString(String path) {
@ -69,4 +71,18 @@ public abstract class BaseScriptTest {
return null;
}
}
class AccessibleExecuteScript extends ExecuteScript implements AccessibleScriptingComponentHelper {
@Override
public ScriptingComponentHelper getScriptingComponentHelper() {
return this.scriptingComponentHelper;
}
}
class AccessibleInvokeScriptedProcessor extends InvokeScriptedProcessor implements AccessibleScriptingComponentHelper {
@Override
public ScriptingComponentHelper getScriptingComponentHelper() {
return this.scriptingComponentHelper;
}
}
}

View File

@ -29,7 +29,7 @@ import static org.junit.Assert.assertEquals;
public class TestExecuteGroovy extends BaseScriptTest {
public final String TEST_CSV_DATA = "gender,title,first,last\n"
private final String TEST_CSV_DATA = "gender,title,first,last\n"
+ "female,miss,marlene,shaw\n"
+ "male,mr,todd,graham";
@ -46,9 +46,9 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger.groovy");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
@ -67,9 +67,9 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testNoIncomingFlowFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger.groovy");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
runner.assertValid();
runner.run();
@ -86,9 +86,9 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testInvalidConfiguration() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION);
runner.setProperty(ExecuteScript.SCRIPT_BODY, "body");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION);
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "body");
runner.assertNotValid();
}
@ -101,9 +101,9 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testCreateNewFlowFileWithScriptFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger_newFlowFile.groovy");
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_onTrigger_newFlowFile.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
@ -124,8 +124,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testCreateNewFlowFileWithNoInputFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY,
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
getFileContentsAsString(TEST_RESOURCE_LOCATION + "groovy/testCreateNewFlowFileWithNoInputFile.groovy")
);
@ -145,8 +145,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testDynamicProperties() throws Exception {
runner.setValidateExpressionUsage(true);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_dynamicProperties.groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/test_dynamicProperties.groovy");
runner.setProperty("myProp", "${myAttr}");
runner.assertValid();
@ -169,9 +169,9 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testChangeFlowFileWithScriptFile() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_FILE, "target/test/resources/groovy/test_onTrigger_changeContent.groovy");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_onTrigger_changeContent.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
runner.assertValid();
runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
@ -193,11 +193,11 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBody() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBody.groovy")
);
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
@ -217,8 +217,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBodyNoModules() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBodyNoModules.groovy")
);
runner.assertValid();
@ -238,8 +238,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test(expected = AssertionError.class)
public void testScriptNoTransfer() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testScriptNoTransfer.groovy")
);
@ -257,8 +257,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileCustomAttribute() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testReadFlowFileContentAndStoreInFlowFileCustomAttribute.groovy")
);
runner.setProperty("testprop", "test content");
@ -281,8 +281,8 @@ public class TestExecuteGroovy extends BaseScriptTest {
@Test(expected = AssertionError.class)
public void testScriptException() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, "throw new Exception()");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "throw new Exception()");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));

View File

@ -43,9 +43,9 @@ public class TestExecuteJRuby extends BaseScriptTest {
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExecuteScript());
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "ruby");
runner.setProperty(ExecuteScript.SCRIPT_FILE, "target/test/resources/jruby/test_onTrigger.rb");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/jruby");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ruby");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jruby/test_onTrigger.rb");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jruby");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));

View File

@ -42,9 +42,9 @@ public class TestExecuteJavascript extends BaseScriptTest {
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExecuteScript());
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ExecuteScript.SCRIPT_FILE, "target/test/resources/javascript/test_onTrigger.js");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/javascript");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/javascript/test_onTrigger.js");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/javascript");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));

View File

@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* Created by mburgess on 1/25/16.
* Unit tests for ExecuteScript with Jython.
*/
public class TestExecuteJython extends BaseScriptTest {
@ -41,8 +41,8 @@ public class TestExecuteJython extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBody() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "python");
runner.setProperty(ExecuteScript.SCRIPT_BODY,
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"from org.apache.nifi.processors.script import ExecuteScript\n"
+ "flowFile = session.get()\n"
+ "flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n"
@ -65,8 +65,8 @@ public class TestExecuteJython extends BaseScriptTest {
@Test(expected = AssertionError.class)
public void testScriptNoTransfer() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "python");
runner.setProperty(ExecuteScript.SCRIPT_BODY,
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n");
runner.assertValid();

View File

@ -43,9 +43,9 @@ public class TestExecuteLua extends BaseScriptTest {
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExecuteScript());
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "lua");
runner.setProperty(ExecuteScript.SCRIPT_FILE, "target/test/resources/lua/test_onTrigger.lua");
runner.setProperty(ExecuteScript.MODULES, "target/test/resources/lua");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "lua");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/lua/test_onTrigger.lua");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/lua");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));

View File

@ -50,9 +50,9 @@ public class TestInvokeGroovy extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "Groovy");
runner.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
runner.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/groovy");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
@ -77,9 +77,9 @@ public class TestInvokeGroovy extends BaseScriptTest {
processor.initialize(initContext);
context.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "Groovy");
context.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
context.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/groovy");
context.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
context.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
context.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
// State Manger is unused, and a null reference is specified
processor.customValidate(new MockValidationContext(context));
processor.setup(context);
@ -111,8 +111,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
processor.initialize(initContext);
context.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "Groovy");
context.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
context.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
context.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_reader.groovy");
// State Manger is unused, and a null reference is specified
processor.customValidate(new MockValidationContext(context));
processor.setup(context);
@ -140,8 +140,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
public void testInvokeScriptCausesException() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testInvokeScriptCausesException.groovy")
);
runner.assertValid();
@ -158,8 +158,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
@Test
public void testScriptRoutesToFailure() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "Groovy");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "groovy/testScriptRoutesToFailure.groovy")
);
runner.assertValid();

View File

@ -52,9 +52,9 @@ public class TestInvokeJavascript extends BaseScriptTest {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
runner.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/javascript");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/javascript");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
@ -80,9 +80,9 @@ public class TestInvokeJavascript extends BaseScriptTest {
processor.initialize(initContext);
context.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "ECMAScript");
context.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
context.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/javascript");
context.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
context.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
context.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/javascript");
// State Manger is unused, and a null reference is specified
processor.customValidate(new MockValidationContext(context));
processor.setup(context);
@ -115,9 +115,9 @@ public class TestInvokeJavascript extends BaseScriptTest {
processor.initialize(initContext);
context.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "ECMAScript");
context.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
context.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/javascript");
context.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
context.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/javascript/test_reader.js");
context.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/javascript");
// State Manger is unused, and a null reference is specified
processor.customValidate(new MockValidationContext(context));
@ -146,8 +146,8 @@ public class TestInvokeJavascript extends BaseScriptTest {
public void testInvokeScriptCausesException() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "javascript/testInvokeScriptCausesException.js")
);
runner.assertValid();
@ -164,8 +164,8 @@ public class TestInvokeJavascript extends BaseScriptTest {
@Test
public void testScriptRoutesToFailure() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ExecuteScript.SCRIPT_BODY, getFileContentsAsString(
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "ECMAScript");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getFileContentsAsString(
TEST_RESOURCE_LOCATION + "javascript/testScriptRoutesToFailure.js")
);
runner.assertValid();

View File

@ -52,8 +52,8 @@ public class TestInvokeJython extends BaseScriptTest {
public void testAlwaysInvalid() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "python");
runner.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
final Collection<ValidationResult> results = ((MockProcessContext) runner.getProcessContext()).validate();
Assert.assertEquals(1L, results.size());
@ -72,8 +72,8 @@ public class TestInvokeJython extends BaseScriptTest {
public void testUpdateAttributeFromProcessorPropertyAndFlowFileAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "python");
runner.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py");
runner.setProperty("for-attributes", "value-1");
final Map<String, String> attributes = new HashMap<>();
@ -102,9 +102,9 @@ public class TestInvokeJython extends BaseScriptTest {
public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "python");
runner.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/jython/test_reader.py");
runner.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/jython");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_reader.py");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
@ -124,9 +124,9 @@ public class TestInvokeJython extends BaseScriptTest {
public void testCompressor() throws Exception {
final TestRunner one = TestRunners.newTestRunner(new InvokeScriptedProcessor());
one.setValidateExpressionUsage(false);
one.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "python");
one.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
one.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/jython");
one.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
one.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
one.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython");
one.setProperty("mode", "compress");
one.assertValid();
@ -138,9 +138,9 @@ public class TestInvokeJython extends BaseScriptTest {
final TestRunner two = TestRunners.newTestRunner(new InvokeScriptedProcessor());
two.setValidateExpressionUsage(false);
two.setProperty(InvokeScriptedProcessor.SCRIPT_ENGINE, "python");
two.setProperty(InvokeScriptedProcessor.MODULES, "target/test/resources/jython");
two.setProperty(InvokeScriptedProcessor.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
two.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
two.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython");
two.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
two.setProperty("mode", "decompress");
two.assertValid();
@ -160,9 +160,9 @@ public class TestInvokeJython extends BaseScriptTest {
@Test
public void testInvalidConfiguration() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "python");
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION);
runner.setProperty(ExecuteScript.SCRIPT_BODY, "body");
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION);
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "body");
runner.assertNotValid();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def events = context.eventAccess.getProvenanceEvents(1,5)
events.eachWithIndex { event, i ->
log.info("Event[$i] ID = ${event.eventId}")
}
x = events.size()
e = events[0]

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
x = [
'uptime' : vmMetrics.uptime(),
'heapUsed' : vmMetrics.heapUsed(),
'heapUsage' : vmMetrics.heapUsage(),
'nonHeapUsage' : vmMetrics.nonHeapUsage(),
'threadCount' : vmMetrics.threadCount(),
'daemonThreadCount' : vmMetrics.daemonThreadCount(),
'fileDescriptorUsage': vmMetrics.fileDescriptorUsage()
]