NIFI-12378 Removed Jython

This closes #8040

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joseph Witt 2023-11-15 16:35:47 -07:00 committed by exceptionfactory
parent a3983ae248
commit 9cfdebc3e8
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
40 changed files with 22 additions and 1369 deletions

View File

@ -72,11 +72,6 @@
<artifactId>ivy</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
@ -94,12 +89,6 @@
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<!-- For Jython 2.7.1 -->
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<version>2.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -143,7 +132,6 @@
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/xmlRecord.xml</exclude>
<exclude>src/test/resources/jython/test_compress.py</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -90,14 +90,14 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
_temp.addAll(scriptingComponentHelper.getDescriptors());
_temp.remove(scriptingComponentHelper.SCRIPT_ENGINE);
PropertyDescriptor.Builder jythonLessEngineProp = new PropertyDescriptor
PropertyDescriptor.Builder engineProp = new PropertyDescriptor
.Builder().fromPropertyDescriptor(scriptingComponentHelper.SCRIPT_ENGINE);
List<AllowableValue> filtered = scriptingComponentHelper.getScriptEngineAllowableValues()
.stream().filter(allowableValue -> !allowableValue.getValue().contains("ython"))
.stream()
.collect(Collectors.toList());
jythonLessEngineProp.allowableValues(filtered.toArray(new AllowableValue[filtered.size()]));
engineProp.allowableValues(filtered.toArray(new AllowableValue[filtered.size()]));
supportedPropertyDescriptors.add(jythonLessEngineProp.build());
supportedPropertyDescriptors.add(engineProp.build());
supportedPropertyDescriptors.addAll(_temp);
final ConfigurableComponent instance = lookupService.get();

View File

@ -33,10 +33,9 @@ import java.util.Set;
/**
* A Controller service that allows the user to script the lookup operation to be performed (by LookupRecord, e.g.)
*/
@Tags({"lookup", "record", "script", "invoke", "groovy", "python", "jython"})
@Tags({"lookup", "record", "script", "invoke", "groovy"})
@CapabilityDescription("Allows the user to provide a scripted LookupService instance in order to enrich records from " +
"an incoming flow file. Please note, that due to a bug in Jython that remains unresolved, it is not possible to use " +
"Jython to write a script for this service in Python.")
"an incoming flow file.")
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")

View File

@ -33,12 +33,11 @@ import java.util.Set;
/**
* A Controller service that allows the user to script the lookup operation to be performed (by LookupAttribute, e.g.)
*/
@Tags({"lookup", "script", "invoke", "groovy", "python", "jython"})
@Tags({"lookup", "script", "invoke", "groovy"})
@CapabilityDescription("Allows the user to provide a scripted LookupService instance in order to enrich records from "
+ "an incoming flow file. The script is expected to return an optional string value rather than an arbitrary object (record, e.g.). "
+ "Also the scripted lookup service should implement StringLookupService, otherwise the getValueType() method must be implemented even "
+ "though it will be ignored, as SimpleScriptedLookupService returns String as the value type on the script's behalf. Please note that due to "
+ "a bug in Jython that remains unresolved, it is not possible to use Jython to write a script for this service in Python.")
+ "though it will be ignored, as SimpleScriptedLookupService returns String as the value type on the script's behalf.")
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")

View File

@ -64,7 +64,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"script", "execute", "groovy", "python", "jython", "clojure"})
@Tags({"script", "execute", "groovy", "clojure"})
@CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
+ "the script. If the handling is incomplete or incorrect, the session will be rolled back. Experimental: "

View File

@ -68,7 +68,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"script", "invoke", "groovy", "python", "jython"})
@Tags({"script", "invoke", "groovy"})
@CapabilityDescription("Experimental - Invokes a script engine for a Processor defined in the given script. The script must define "
+ "a valid class that implements the Processor interface, and it must set a variable 'processor' to an instance of "
+ "the class. Processor methods such as onTrigger() will be delegated to the scripted Processor instance. Also any "

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.Record;
import javax.script.Bindings;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
class PythonScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final CompiledScript compiledScript;
private final Bindings bindings;
PythonScriptEvaluator(
final ScriptEngine scriptEngine,
final CompiledScript compiledScript,
final FlowFile flowFile,
final ComponentLog componentLog
) {
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
// shows gains of about 100x better performance. But even with the compiled script, performance pales
// in comparison with Groovy.
this.compiledScript = compiledScript;
this.scriptEngine = scriptEngine;
this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", componentLog);
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
compiledScript.eval(bindings);
return scriptEngine.get("_");
}
}

View File

@ -25,7 +25,7 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@Tags({"record", "filter", "script", "groovy", "jython", "python"})
@Tags({"record", "filter", "script", "groovy"})
@CapabilityDescription(
"This processor provides the ability to filter records out from FlowFiles using the user-provided script. " +
"Every record will be evaluated by the script which must return with a boolean value. " +

View File

@ -57,7 +57,7 @@ import java.util.Map;
import java.util.Set;
@SideEffectFree
@Tags({"record", "partition", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"})
@Tags({"record", "partition", "script", "groovy", "segment", "split", "group", "organize"})
@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates the user provided script against "
+ "each record in the incoming flow file. Each record is then grouped with other records sharing the same partition and a FlowFile is created for each groups of records. " +
"Two records shares the same partition if the evaluation of the script results the same return value for both. Those will be considered as part of the same partition.")

View File

@ -50,7 +50,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
abstract class ScriptedRecordProcessor extends AbstractProcessor implements Searchable {
protected static final String PYTHON_SCRIPT_LANGUAGE = "python";
protected static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
protected volatile String scriptToRun = null;
@ -115,11 +114,6 @@ abstract class ScriptedRecordProcessor extends AbstractProcessor implements Sear
}
protected ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, final FlowFile flowFile) throws ScriptException {
if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
final CompiledScript compiledScript = getOrCompileScript((Compilable) scriptEngine, scriptToRun);
return new PythonScriptEvaluator(scriptEngine, compiledScript, flowFile, getLogger());
}
return new InterpretedScriptEvaluator(scriptEngine, scriptToRun, flowFile, getLogger());
}

View File

@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
@SupportsBatching
@SideEffectFree
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "transform", "script", "groovy", "jython", "python", "update", "modify", "filter"})
@Tags({"record", "transform", "script", "groovy", "update", "modify", "filter"})
@Restricted(restrictions = {
@Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")

View File

@ -25,7 +25,7 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@Tags({"record", "validate", "script", "groovy", "jython", "python"})
@Tags({"record", "validate", "script", "groovy"})
@CapabilityDescription(
"This processor provides the ability to validate records in FlowFiles using the user-provided script. " +
"The script is expected to have a record as incoming argument and return with a boolean value. " +

View File

@ -47,7 +47,7 @@ import java.util.Map;
/**
* A RecordReader implementation that allows the user to script the RecordReader instance
*/
@Tags({"record", "recordFactory", "script", "invoke", "groovy", "python", "jython"})
@Tags({"record", "recordFactory", "script", "invoke", "groovy"})
@CapabilityDescription("Allows the user to provide a scripted RecordReaderFactory instance in order to read/parse/generate records from an incoming flow file.")
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,

View File

@ -47,7 +47,7 @@ import java.util.Map;
/**
* A RecordSetWriter implementation that allows the user to script the RecordWriter instance
*/
@Tags({"record", "writer", "script", "invoke", "groovy", "python", "jython"})
@Tags({"record", "writer", "script", "invoke", "groovy"})
@CapabilityDescription("Allows the user to provide a scripted RecordSetWriterFactory instance in order to write records to an outgoing flow file.")
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,

View File

@ -50,7 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"record", "record sink", "script", "invoke", "groovy", "python", "jython"})
@Tags({"record", "record sink", "script", "invoke", "groovy"})
@CapabilityDescription("Allows the user to provide a scripted RecordSinkService instance in order to transmit records to the desired target. The script must set a variable 'recordSink' to an "
+ "implementation of RecordSinkService.")
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",

View File

@ -56,7 +56,7 @@ import java.util.Map;
* A Reporting task whose body is provided by a script (via supported JSR-223 script engines)
*/
@SupportsSensitiveDynamicProperties
@Tags({"reporting", "script", "execute", "groovy", "python", "jython"})
@Tags({"reporting", "script", "execute", "groovy"})
@CapabilityDescription("Provides reporting and status information to a script. ReportingContext, ComponentLog, and VirtualMachineMetrics objects are made available "
+ "as variables (context, log, and vmMetrics, respectively) to the script for further processing. The context makes various information available such "
+ "as events, provenance, bulletins, controller services, process groups, Java Virtual Machine metrics, etc.")

View File

@ -21,7 +21,6 @@ import org.apache.nifi.processors.script.ScriptRunner;
import org.apache.nifi.script.impl.ClojureScriptRunner;
import org.apache.nifi.script.impl.GenericScriptRunner;
import org.apache.nifi.script.impl.GroovyScriptRunner;
import org.apache.nifi.script.impl.JythonScriptRunner;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
@ -51,9 +50,6 @@ public class ScriptRunnerFactory {
if ("Groovy".equals(scriptEngineName)) {
return new GroovyScriptRunner(scriptEngine, scriptToRun, null);
}
if ("python".equals(scriptEngineName)) {
return new JythonScriptRunner(scriptEngine, scriptToRun, modulePaths);
}
if ("Clojure".equals(scriptEngineName)) {
return new ClojureScriptRunner(scriptEngine, scriptToRun, null);
}

View File

@ -267,11 +267,7 @@ public class ScriptingComponentHelper {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
if ("python".equalsIgnoreCase(scriptEngineName)) {
modules = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources();
} else {
modules = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources().flattenRecursively();
}
modules = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources().flattenRecursively();
}
public void stop() {

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.script.impl;
import org.python.core.PyString;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* A helper class to configure the Jython engine with any specific requirements
*/
public class JythonScriptRunner extends BaseScriptRunner {
private final CompiledScript compiledScript;
public JythonScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
super(engine, scriptBody, buildPreloads(modulePaths), modulePaths);
// Add prefix for import sys and all jython modules
compiledScript = ((Compilable) engine).compile(this.scriptBody);
}
private static String buildPreloads(String[] modulePaths) {
return "import sys\n"
+ Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
.collect(Collectors.joining("\n")) + "\n";
}
@Override
public String getScriptEngineName() {
return "python";
}
@Override
public ScriptEngine getScriptEngine() {
return scriptEngine;
}
@Override
public void run(Bindings bindings) throws ScriptException {
if (compiledScript == null) {
throw new ScriptException("Jython script has not been successfully compiled");
}
compiledScript.eval();
}
}

View File

@ -14,5 +14,4 @@
# limitations under the License.
org.apache.nifi.script.impl.ClojureScriptRunner
org.apache.nifi.script.impl.JythonScriptRunner
org.apache.nifi.script.impl.GroovyScriptRunner

View File

@ -45,7 +45,6 @@ td {text-align: left}
<p>
<b>Notes:</b>
<ul>
<li>The engine listed as "python" in the list of available script engines is actually Jython, not Python. When using Jython, you cannot import pure (CPython) modules such as pandas</li>
<li>ExecuteScript uses the JSR-223 Script Engine API to evaluate scripts, so the use of idiomatic language structure is sometimes limited. For example, in the case of Groovy, there is a separate ExecuteGroovyScript processor that allows you to do many more idiomatic Groovy tasks. For example, it's easier to interact with Controller Services via ExecuteGroovyScript vs. ExecuteScript (see the ExecuteGroovyScript documentation for more details)</li>
</ul>
</p>
@ -101,12 +100,6 @@ td {text-align: left}
<pre>flowFile = session.get()
if(!flowFile) return
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Get multiple incoming FlowFiles from the session</strong>:</p>
@ -121,12 +114,6 @@ if(!flowFileList.isEmpty()) {
}
}
</pre>
<p><em>Jython</em></p>
<pre>flowFileList = session.get(100)
if not flowFileList.isEmpty():
for flowFile in flowFileList:
# Process each FlowFile here
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Create a new FlowFile</strong></p>
@ -137,10 +124,6 @@ if not flowFileList.isEmpty():
<pre>flowFile = session.create()
// Additional processing here
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.create()
# Additional processing here
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Create a new FlowFile from a parent FlowFile</strong></p>
@ -153,12 +136,6 @@ if(!flowFile) return
newFlowFile = session.create(flowFile)
// Additional processing here
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
newFlowFile = session.create(flowFile)
# Additional processing here
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Add an attribute to a FlowFile</strong></p>
@ -171,18 +148,11 @@ if (flowFile != None):
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Add multiple attributes to a FlowFile</strong></p>
<p><strong>Use Case</strong>: You have a FlowFile to which you'd like to add custom attributes.</p>
<p><strong>Approach</strong>: Use the putAllAttributes(<em>flowFile</em>, <em>attributeMap</em>) method from the session object. This method updates the given FlowFile's attributes with the key/value pairs from the given Map. NOTE: The "uuid" attribute is fixed for a FlowFile and cannot be modified; if the key is named "uuid", it will be ignored.</p>
<p>The technique here is to create a Map (aka dictionary in Jython) of the attribute key/value pairs you'd like to update, then call putAllAttributes() on it. This is much more efficient than calling putAttribute() for each key/value pair, as the latter case will cause the framework to create a temporary version of the FlowFile for each attribute added (see above recipe for discussion on FlowFile immutability). The examples show a map of two entries myAttr1 and myAttr2, set to '1' and the language-specific coercion of the number 2 as a String (to adhere to the method signature of requiring String values for both key and value). Note that a session.transfer() is not specified here (so the code snippets below do not work as-is), see the following recipe for that.</p>
<p><strong>Examples</strong>:</p>
<p><em>Groovy</em></p>
<pre>attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
@ -190,13 +160,6 @@ flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)
</pre>
<p><em>Jython</em></p>
<pre>attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
flowFile = session.putAllAttributes(flowFile, attrMap)
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Get an attribute from a FlowFile</strong></p>
@ -208,12 +171,6 @@ if (flowFile != None):
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
myAttr = flowFile.getAttribute('filename')
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Get all attributes from a FlowFile</strong></p>
@ -227,13 +184,6 @@ flowFile.getAttributes().each { key,value -&gt;
// Do something with the key/value pair
}
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
for key,value in flowFile.getAttributes().iteritems():
# Do something with key and/or value
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p><strong>Transfer a FlowFile to a relationship</strong></p>
<p><strong>Use Case</strong>: After processing a FlowFile (new or incoming), you want to transfer the FlowFile to a relationship ("success" or "failure"). In this simple case let us assume there is a variable called "errorOccurred" that indicates which relationship to which the FlowFile should be transferred. Additional error handling techniques will be discussed in part 2 of this series.</p>
@ -251,16 +201,6 @@ else {
session.transfer(flowFile, REL_SUCCESS)
}
</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
if errorOccurred:
session.transfer(flowFile, REL_FAILURE)
else:
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p><strong>Send a message to the log at a specified logging level</strong></p>
<p><strong>Use Case</strong>: You want to report some event that has occurred during processing to the logging framework.</p>
@ -269,13 +209,6 @@ if (flowFile != None):
<p><em>Groovy</em></p>
<pre>log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
</pre>
<p><em>Jython</em></p>
<pre>from java.lang import Object
from jarray import array
objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Read the contents of an incoming FlowFile using a callback</strong></p>
@ -293,25 +226,6 @@ session.read(flowFile, {inputStream -&gt;
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// Do something with text here
} as InputStreamCallback)</pre>
<p><em>Jython</em></p>
<pre>from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
pass
def process(self, inputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Do something with text here
# end class
flowFile = session.get()
if(flowFile != None):
session.read(flowFile, PyInputStreamCallback())
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Write content to an outgoing FlowFile using a callback</strong></p>
@ -330,24 +244,6 @@ def text = 'Hello world!'
flowFile = session.write(flowFile, {outputStream -&gt;
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)</pre>
<p><em>Jython</em></p>
<pre>from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
pass
def process(self, outputStream):
outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Overwrite an incoming FlowFile with updated content using a callback</strong></p>
@ -371,25 +267,6 @@ flowFile = session.write(flowFile, {inputStream, outputStream -&gt;
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)</pre>
<p><em>Jython</em></p>
<pre>from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
# implicit return at the end
</pre>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p><strong>Handle errors during script processing</strong></p>
@ -411,19 +288,5 @@ try {
session.transfer(flowFile, REL_FAILURE)
}</pre>
<p><em>Jython</em></p>
<pre>flowFile = session.get()
if(flowFile != None):
try:
# Something that might throw an exception here
# Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
except:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
</pre>
</body>
</html>

View File

@ -33,7 +33,7 @@ td {text-align: left}
<h3>Description</h3>
<p>
The ScriptedFilterRecord Processor provides the ability to use a scripting language, such as Groovy or Jyton in order to remove Records from an incoming FlowFile.
The ScriptedFilterRecord Processor provides the ability to use a scripting language, such as Groovy in order to remove Records from an incoming FlowFile.
NiFi provides several different Processors that can be used to work with Records in different ways. Each of these processors has its pros and cons.
The ScriptedFilterRecord is intended to work together with these processors and be used as a pre-processing step before processing the FlowFile with more performance consuming Processors, like ScriptedTransformRecord.
</p>
@ -144,16 +144,6 @@ return recordIndex < 2 ? true : false
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = True if (recordIndex < 2) else False
</code>
</pre>
<h3>Filtering based on Record contents</h3>
<p>
@ -216,16 +206,5 @@ if (record.getValue("allyOf") == "Athens") {
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
allyOf = record.getValue("allyOf")
_ = True if (allyOf == "Athens") else False
</code>
</pre>
</body>
</html>

View File

@ -33,7 +33,7 @@ td {text-align: left}
<h3>Description</h3>
<p>
The ScriptedPartitionRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily partition a Record based on its contents.
The ScriptedPartitionRecord provides the ability to use a scripting language, such as Groovy, to quickly and easily partition a Record based on its contents.
There are multiple ways to reach the same behaviour such as using PartitionRecord but working with user provided scripts opens a wide range of possibilities on the decision
logic of partitioning the individual records.
</p>
@ -162,13 +162,5 @@ Tau Ceti,G
return record.getValue("stellarType")
</code>
<p>
Example Script (Python):
</p>
<code>
_ = record.getValue("stellarType")
</code>
</body>
</html>

View File

@ -33,7 +33,7 @@ td {text-align: left}
<h3>Description</h3>
<p>
The ScriptedTransformRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily update the contents of a Record.
The ScriptedTransformRecord provides the ability to use a scripting language, such as Groovy, to quickly and easily update the contents of a Record.
NiFi provides several different Processors that can be used to manipulate Records in different ways. Each of these processors has its pros and cons. The ScriptedTransformRecord is perhaps
the most powerful and most versatile option. However, it is also the most error-prone, as it depends on writing custom scripts. It is also likely to yield the lowest performance,
as processors and libraries written directly in Java are likely to perform better than interpreted scripts.
@ -109,12 +109,6 @@ td {text-align: left}
a Record or Collection of Records, the incoming FlowFile will be routed to the <code>failure</code> relationship.
</p>
<p>
Note that the Python language does not allow a script to use a <code>return</code> outside of a method. Additionally, when interpreted as a script,
the Java Python scripting engine does not provide a reliable way to easily obtain the last value referenced. As a result, any Python script must assign the value to be returned
to the <code>_</code> variable. See examples below.
</p>
<p>
The Record that is provided to the script is mutable. Therefore, it is a common pattern to update the <code>record</code> object in the script and simply return that same
<code>record</code> object.
@ -151,11 +145,6 @@ td {text-align: left}
situations when performance is critical, the best case is to test both approaches to see which performs best.
</p>
<p>
It is important to note also, though, that not all scripting languages and script engines are equal. For example, Groovy scripts will typically run much faster than Jython scripts.
However, if those in your organization are more familiar with Python than Java or Groovy, then using Jython may still make more sense.
</p>
<p>
A simple 5-minute benchmark was done to analyze the difference in performance. The script used simply modifies one field and return the Record otherwise unmodified.
The results are shown below. Note that no specifics are given with regards to hardware, specifically because the results should not be used to garner expectations of
@ -182,15 +171,6 @@ record
</td>
<td>18.9 million</td>
</tr>
<tr>
<td>ScriptedTransformRecord - Using Language: python</td>
<td>
<pre><code>record.setValue("num", 42)
_ = record
</code></pre>
</td>
<td>21.0 million</td>
</tr>
</table>
@ -236,18 +216,6 @@ return recordIndex == 0 ? null : record
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = None if (recordIndex == 0) else record
</code>
</pre>
<h3>Replace Field Value</h3>
@ -363,17 +331,6 @@ record
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = record
</code>
</pre>
<a name="AddingFieldExample"></a>

View File

@ -159,16 +159,6 @@ return recordIndex < 2 ? true : false
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = True if (recordIndex < 2) else False
</code>
</pre>
<h3>Validating based on Record contents</h3>
<p>
@ -250,16 +240,5 @@ if (record.getValue("numberOfTrains").toInteger() >= 0) {
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
trains = record.getValue("numberOfTrains")
_ = True if (trains >= 0) else False
</code>
</pre>
</body>
</html>

View File

@ -1,128 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Unit tests for ExecuteScript with Jython.
*/
public class TestExecuteJython extends BaseScriptTest {
@BeforeEach
public void setup() throws Exception {
super.setupExecuteScript();
}
/**
* Tests a Jython script that has provides the body of an onTrigger() function.
*
* @throws Exception Any error encountered while testing
*/
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttributeWithScriptBody() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"from org.apache.nifi.processors.script import ExecuteScript\n"
+ "flowFile = session.get()\n"
+ "flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n"
+ "session.transfer(flowFile, ExecuteScript.REL_SUCCESS)");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("from-content", "test content");
}
/**
* Tests a Jython script that references an outside python module
*
*/
@Test
public void testAccessModuleAndStoreInFlowFileAttributeWithScriptBody() {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython/");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"from org.apache.nifi.processors.script import ExecuteScript\n"
+ "from test_external_module import ExternalModule\n"
+ "externalModule = ExternalModule()\n"
+ "flowFile = session.get()\n"
+ "flowFile = session.putAttribute(flowFile, \"key\", externalModule.testHelloWorld())\n"
+ "session.transfer(flowFile, ExecuteScript.REL_SUCCESS)");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("key", "helloWorld");
}
/**
* Tests a script that does not transfer or remove the original flow file, thereby causing an error during commit.
*
*/
@Test
public void testScriptNoTransfer() {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
assertThrows(AssertionError.class, () -> runner.run());
}
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
@Test
public void testPerformance() {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY,
"from org.apache.nifi.processors.script import ExecuteScript\n"
+ "flowFile = session.get()\n"
+ "flowFile = session.putAttribute(flowFile, \"from-content\", \"test content\")\n"
+ "session.transfer(flowFile, ExecuteScript.REL_SUCCESS)");
runner.assertValid();
final int ITERATIONS = 50000;
for (int i = 0; i < ITERATIONS; i++) {
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
}
runner.run(ITERATIONS);
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS);
result.get(0).assertAttributeEquals("from-content", "test content");
}
}

View File

@ -1,193 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestInvokeJython extends BaseScriptTest {
/**
* Copies all scripts to the target directory because when they are compiled they can leave unwanted .class files.
*
* @throws Exception Any error encountered while testing
*/
@BeforeEach
public void setup() throws Exception {
super.setupInvokeScriptProcessor();
}
/**
* Tests a script that has a Jython processor that is always invalid.
*
*/
@Test
public void testAlwaysInvalid() {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
final Collection<ValidationResult> results = ((MockProcessContext) runner.getProcessContext()).validate();
assertEquals(1L, results.size());
assertEquals("Never valid.", results.iterator().next().getExplanation());
}
/**
* Tests a script that has a Jython processor that begins invalid then is fixed.
*
*/
@Test
public void testInvalidThenFixed() {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_invalid.py");
final Collection<ValidationResult> results = ((MockProcessContext) runner.getProcessContext()).validate();
assertEquals(1L, results.size());
assertEquals("Never valid.", results.iterator().next().getExplanation());
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py");
runner.setProperty("for-attributes", "value-1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("for-attributes", "value-2");
runner.assertValid();
runner.enqueue(new byte[0], attributes);
runner.run();
}
/**
* Test a script that has a Jython processor that reads a value from a processor property and another from a flowfile attribute then stores both in the attributes of the flowfile being routed.
* <p>
* This may seem contrived but it verifies that the Jython processors properties are being considered and are able to be set and validated. It verifies the processor is able to access the property
* values and flowfile attribute values during onTrigger. Lastly, it verifies the processor is able to route the flowfile to a relationship it specified.
*
*/
@Test
public void testUpdateAttributeFromProcessorPropertyAndFlowFileAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_update_attribute.py");
runner.setProperty("for-attributes", "value-1");
final Map<String, String> attributes = new HashMap<>();
attributes.put("for-attributes", "value-2");
runner.assertValid();
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
// verify reading a property value
result.get(0).assertAttributeEquals("from-property", "value-1");
// verify reading an attribute value
result.get(0).assertAttributeEquals("from-attribute", "value-2");
}
/**
* Tests a script that has a Jython Processor that that reads the first line of text from the flowfiles content and stores the value in an attribute of the outgoing flowfile.
*
*/
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new InvokeScriptedProcessor());
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_reader.py");
// Use EL to populate MODULES property
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/${literal('JYTHON'):toLower()}");
runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
result.get(0).assertAttributeEquals("from-content", "test content");
}
/**
* Tests compression and decompression using two different InvokeScriptedProcessor processor instances. A string is compressed and decompressed and compared.
*
*/
@Test
public void testCompressor() {
final TestRunner one = TestRunners.newTestRunner(new InvokeScriptedProcessor());
one.setValidateExpressionUsage(false);
one.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
one.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
one.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython");
one.setProperty("mode", "compress");
one.assertValid();
one.enqueue("test content".getBytes(StandardCharsets.UTF_8));
one.run();
one.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> oneResult = one.getFlowFilesForRelationship("success");
final TestRunner two = TestRunners.newTestRunner(new InvokeScriptedProcessor());
two.setValidateExpressionUsage(false);
two.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
two.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/jython");
two.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/jython/test_compress.py");
two.setProperty("mode", "decompress");
two.assertValid();
two.enqueue(oneResult.get(0));
two.run();
two.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> twoResult = two.getFlowFilesForRelationship("success");
assertEquals("test content", new String(twoResult.get(0).toByteArray(), StandardCharsets.UTF_8));
}
/**
* Tests a script file that creates and transfers a new flow file.
*
*/
@Test
public void testInvalidConfiguration() {
runner.setValidateExpressionUsage(false);
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION);
runner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "body");
runner.assertNotValid();
}
}

View File

@ -56,12 +56,6 @@ public class TestScriptedTransformRecord {
testPassThrough("Groovy", "record");
}
@Test
public void testSimpleJythonScript() throws InitializationException {
testPassThrough("python", "_ = record");
}
private void testPassThrough(final String language, final String script) throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
@ -408,66 +402,6 @@ public class TestScriptedTransformRecord {
assertEquals("Unknown Author", outputRecords.get(1).getAsRecord("book", bookSchema).getValue("author"));
}
@Test
public void testRecompileJythonScript() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, "python");
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "_ = record");
final Map<String, Object> num1 = new HashMap<>();
num1.put("num", 1);
final Map<String, Object> num2 = new HashMap<>();
num2.put("num", 2);
final Map<String, Object> num3 = new HashMap<>();
num3.put("num", 3);
recordReader.addRecord(new MapRecord(schema, num1));
recordReader.addRecord(new MapRecord(schema, num2));
recordReader.addRecord(new MapRecord(schema, num3));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, recordsWritten.get(i).getAsInt("num").intValue());
}
testRunner.clearTransferState();
// reset the writer
testRunner.removeControllerService(recordWriter);
recordWriter = new ArrayListRecordWriter(schema);
testRunner.addControllerService("record-writer", recordWriter);
testRunner.enableControllerService(recordWriter);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "record.setValue(\"num\", 5)\n_ = record");
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i = 0; i < 3; i++) {
assertEquals(5, recordsWritten.get(i).getAsInt("num").intValue());
}
}
private Record createBook(final String author, final String date, final RecordSchema bookSchema, final RecordSchema outerSchema) {
final Map<String, Object> firstBookValues = new HashMap<>();
firstBookValues.put("author", author);

View File

@ -36,7 +36,6 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.script.ScriptEngine;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -127,23 +126,6 @@ class ScriptedReportingTaskTest {
assertTrue(x.get("uptime") >= 0);
}
@Test
void testVMEventsJythonScript() throws Exception {
properties.put(SCRIPT_ENGINE_PROPERTY_DESCRIPTOR, "python");
Files.copy(Paths.get("src/test/resources/jython/test_log_vm_stats.py"), targetPath, StandardCopyOption.REPLACE_EXISTING);
properties.put(ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
reportingContext.setProperty(SCRIPT_ENGINE, "python");
reportingContext.setProperty(ScriptingComponentUtils.SCRIPT_FILE.getName(), targetPath.toString());
run();
// This script should store a variable called x with a map of stats to values
ScriptEngine se = task.getScriptRunner().getScriptEngine();
@SuppressWarnings("unchecked")
final Map<String, BigInteger> x = (Map<String, BigInteger>)se.get("x");
assertTrue(x.get("uptime").longValue() >= 0);
}
private void run() throws Exception {
task.initialize(initContext);
task.getSupportedPropertyDescriptors();

View File

@ -1,23 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from read_first_line import ReadFirstLine
from compress import Compress
from decompress import Decompress

View File

@ -1,49 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
import bz2
from jarray import zeros
from org.python.core.util import StringUtil
from org.apache.nifi.processor.io import StreamCallback
class Compress(StreamCallback) :
__line = None;
def __init__(self) :
pass
def process(self, input, output) :
try :
comp = bz2.BZ2Compressor()
buf = zeros(8192, "b")
while True :
bytes_read = input.read(buf)
if(bytes_read == -1) :
break
output.write(comp.compress(StringUtil.fromBytes(buf, 0, bytes_read)))
output.write(comp.flush())
except :
print "Exception in Compress:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
raise

View File

@ -1,48 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
import bz2
from jarray import zeros
from org.python.core.util import StringUtil
from org.apache.nifi.processor.io import StreamCallback
class Decompress(StreamCallback) :
__line = None;
def __init__(self) :
pass
def process(self, input, output) :
try :
comp = bz2.BZ2Decompressor()
buf = zeros(8192, "b")
while True :
bytes_read = input.read(buf)
if(bytes_read == -1) :
break
output.write(comp.decompress(StringUtil.fromBytes(buf, 0, bytes_read)))
except :
print "Exception in Decompress:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
raise

View File

@ -1,50 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
from org.apache.nifi.processor.io import InputStreamCallback
from java.io import BufferedReader, InputStreamReader
class ReadFirstLine(InputStreamCallback) :
__line = None;
def __init__(self) :
pass
def getLine(self) :
return self.__line
def process(self, input) :
try :
reader = InputStreamReader(input)
bufferedReader = BufferedReader(reader)
self.__line = bufferedReader.readLine()
except :
print "Exception in Reader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
raise
finally :
if bufferedReader is not None :
bufferedReader.close()
if reader is not None :
reader.close()

View File

@ -1,57 +0,0 @@
import traceback
# There is no Apache header on this file in order to reproduce the issue in NIFI-9596
# where the first line gets appended to the last line of the additional modules
from callbacks import Compress, Decompress
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
class CompressFlowFile(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
descriptor = PropertyDescriptor.Builder().name("mode").allowableValues("compress", "decompress").required(True).build()
return [descriptor]
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
if context.getProperty("mode").getValue() == "compress" :
flowfile = session.write(flowfile, Compress())
else :
flowfile = session.write(flowfile, Decompress())
# transfer
session.transfer(flowfile, self.__rel_success)
session.commitAsync()
except :
print sys.exc_info()[0]
print "Exception in TestReader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
session.rollback(true)
raise
processor = CompressFlowFile()

View File

@ -1,29 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
class ExternalModule :
def __init__(self) :
pass
def testHelloWorld(self) :
return "helloWorld"

View File

@ -1,48 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from org.apache.nifi.processor import Processor
from org.apache.nifi.components import ValidationResult
class AlwaysInvalid(Processor) :
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
pass
def validate(self, context) :
error = ValidationResult.Builder().subject("Processor Error").valid(False).explanation("Never valid.").build()
return [error]
def getPropertyDescriptors(self) :
pass
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
pass
processor = AlwaysInvalid()

View File

@ -1,29 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
x = {
'uptime': vmMetrics.uptime(),
'heapUsed': vmMetrics.heapUsed(None),
'heapUsage': vmMetrics.heapUsage(),
'nonHeapUsage': vmMetrics.nonHeapUsage(),
'threadCount': vmMetrics.threadCount(),
'daemonThreadCount': vmMetrics.daemonThreadCount(),
'fileDescriptorUsage': vmMetrics.fileDescriptorUsage()
}

View File

@ -1,74 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
class ReadModulesPathAndStoreAsAttribute(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
pass
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
# Extract sys.path and encode as a comma-separated string
sysPathString = ','.join(sys.path)
# set an attribute with the captured contents of sys.path
flowfile = session.putAttribute(flowfile, "from-path", sysPathString)
# transfer
session.transfer(flowfile, self.__rel_success)
session.commitAsync()
except :
print sys.exc_info()[0]
print "Exception in TestReader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
session.rollback(True)
raise
processor = ReadModulesPathAndStoreAsAttribute()

View File

@ -1,74 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
from callbacks import ReadFirstLine
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
class ReadContentAndStoreAsAttribute(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
pass
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
reader = ReadFirstLine()
session.read(flowfile, reader);
# set an attribute
flowfile = session.putAttribute(flowfile, "from-content", reader.getLine())
# transfer
session.transfer(flowfile, self.__rel_success)
session.commitAsync()
except :
print sys.exc_info()[0]
print "Exception in TestReader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
session.rollback(True)
raise
processor = ReadContentAndStoreAsAttribute()

View File

@ -1,79 +0,0 @@
#! /usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import traceback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.apache.nifi.processor.util import StandardValidators
class UpdateAttributes(Processor) :
__rel_success = Relationship.Builder().description("Success").name("success").build()
def __init__(self) :
pass
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
descriptor = PropertyDescriptor.Builder().name("for-attributes").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
return [descriptor]
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory) :
session = sessionFactory.createSession()
try :
# ensure there is work to do
flowfile = session.get()
if flowfile is None :
return
# extract some attribute values
fromPropertyValue = context.getProperty("for-attributes").getValue()
fromAttributeValue = flowfile.getAttribute("for-attributes")
# set an attribute
flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue)
flowfile = session.putAttribute(flowfile, "from-attribute", fromAttributeValue)
# transfer
session.transfer(flowfile, self.__rel_success)
session.commitAsync()
except :
print sys.exc_info()[0]
print "Exception in TestReader:"
print '-' * 60
traceback.print_exc(file=sys.stdout)
print '-' * 60
session.rollback(true)
raise
processor = UpdateAttributes()