NIFI-7572: Added ScriptedTransformRecord processor. Addressed a couple of minor bugs in mock classes that were encountered during testing.

NIFI-7572: Addressed review feedback
NIFI-7572: Fixed bug that resulted in constantly recompiling Jython script. Updated documentation showing performance difference. Fixed problematic unit tests for TestResizeImage

This closes #4374

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Mark Payne 2020-06-28 16:38:44 -04:00 committed by Mike Thomsen
parent e17db80514
commit 6cece9cad7
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
13 changed files with 1377 additions and 74 deletions

View File

@ -83,6 +83,22 @@ public class MockComponentLog implements ComponentLog {
return modifiedArgs;
}
private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t, final boolean appendThrowable) {
if (!appendThrowable) {
return addProcessorAndThrowable(os, t);
}
final Object[] modifiedArgs = new Object[os.length + 3];
modifiedArgs[0] = component.toString();
for (int i = 0; i < os.length; i++) {
modifiedArgs[i + 1] = os[i];
}
modifiedArgs[modifiedArgs.length - 2] = t.toString();
modifiedArgs[modifiedArgs.length - 1] = t;
return modifiedArgs;
}
private Object[] prependToArgs(final Object[] originalArgs, final Object... toAdd) {
final Object[] newArgs = new Object[originalArgs.length + toAdd.length];
System.arraycopy(toAdd, 0, newArgs, 0, toAdd.length);
@ -267,13 +283,10 @@ public class MockComponentLog implements ComponentLog {
@Override
public void error(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
os = addProcessorAndThrowable(os, t, true);
msg = "{} " + msg + ": {}";
logger.error(msg, os);
if (logger.isDebugEnabled()) {
logger.error("", t);
}
}
@Override

View File

@ -84,7 +84,7 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
@Override
public WriteResult finishRecordSet() {
return null;
return WriteResult.of(records.size(), Collections.emptyMap());
}
@Override

View File

@ -30,7 +30,6 @@ import java.io.IOException;
import java.nio.file.Paths;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestResizeImage {
@ -85,9 +84,6 @@ public class TestResizeImage {
// Should return REL_FAILURE and log an IllegalArgumentException
runner.assertAllFlowFilesTransferred(ResizeImage.REL_FAILURE, 1);
assertEquals(1, runner.getLogger().getErrorMessages().size());
assertEquals(4, runner.getLogger().getErrorMessages().get(0).getArgs().length);
assertTrue(runner.getLogger().getErrorMessages().get(0).getArgs()[3].toString()
.startsWith("java.lang.IllegalArgumentException"));
}
@Test
@ -103,9 +99,6 @@ public class TestResizeImage {
// Should return REL_FAILURE and log a ProcessException
runner.assertAllFlowFilesTransferred(ResizeImage.REL_FAILURE, 1);
assertEquals(1, runner.getLogger().getErrorMessages().size());
assertEquals(4, runner.getLogger().getErrorMessages().get(0).getArgs().length);
assertTrue(runner.getLogger().getErrorMessages().get(0).getArgs()[3].toString()
.startsWith("org.apache.nifi.processor.exception.ProcessException"));
}
@Test
@ -121,9 +114,6 @@ public class TestResizeImage {
// Should return REL_FAILURE and log a NegativeArraySizeException
runner.assertAllFlowFilesTransferred(ResizeImage.REL_FAILURE, 1);
assertEquals(1, runner.getLogger().getErrorMessages().size());
assertEquals(4, runner.getLogger().getErrorMessages().get(0).getArgs().length);
assertTrue(runner.getLogger().getErrorMessages().get(0).getArgs()[3].toString()
.startsWith("java.lang.NegativeArraySizeException"));
}
@Test

View File

@ -16,9 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -59,13 +57,11 @@ import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "clojure"})
@ -96,7 +92,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
public static final Relationship REL_SUCCESS = ScriptingComponentUtils.REL_SUCCESS;
public static final Relationship REL_FAILURE = ScriptingComponentUtils.REL_FAILURE;
private String scriptToRun = null;
private volatile String scriptToRun = null;
volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
@ -282,34 +278,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
@Override
public Collection<SearchResult> search(SearchContext context) {
Collection<SearchResult> results = new ArrayList<>();
String term = context.getSearchTerm();
String scriptFile = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
String script = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
if (StringUtils.isBlank(script)) {
try {
script = IOUtils.toString(new FileInputStream(scriptFile), "UTF-8");
} catch (Exception e) {
getLogger().error(String.format("Could not read from path %s", scriptFile), e);
return results;
}
}
Scanner scanner = new Scanner(script);
int index = 1;
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (StringUtils.containsIgnoreCase(line, term)) {
String text = String.format("Matched script at line %d: %s", index, line);
results.add(new SearchResult.Builder().label(text).match(term).build());
}
index++;
}
return results;
return ScriptingComponentUtils.search(context, getLogger());
}
}

View File

@ -0,0 +1,412 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SupportsBatching
@SideEffectFree
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "transform", "script", "groovy", "jython", "python", "update", "modify", "filter"})
@Restricted(restrictions = {
@Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
})
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
})
@CapabilityDescription("Provides the ability to evaluate a simple script against each record in an incoming FlowFile. The script may transform the record in some way, filter the record, or fork " +
"additional records. See Processor's Additional Details for more information.")
@SeeAlso(classNames = {"org.apache.nifi.processors.script.ExecuteScript",
"org.apache.nifi.processors.standard.UpdateRecord",
"org.apache.nifi.processors.standard.QueryRecord",
"org.apache.nifi.processors.standard.JoltTransformRecord",
"org.apache.nifi.processors.standard.LookupRecord"})
public class ScriptedTransformRecord extends AbstractProcessor implements Searchable {
private static final String PYTHON_SCRIPT_LANGUAGE = "python";
private static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
static final PropertyDescriptor RECORD_READER = new Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("The Record Reader to use parsing the incoming FlowFile into Records")
.required(true)
.identifiesControllerService(RecordReaderFactory.class)
.build();
static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use for serializing Records after they have been transformed")
.required(true)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
static final PropertyDescriptor LANGUAGE = new Builder()
.name("Script Engine")
.displayName("Script Language")
.description("The Language to use for the script")
.allowableValues(SCRIPT_OPTIONS)
.defaultValue("Groovy")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Each FlowFile that were successfully transformed will be routed to this Relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be transformed will be routed to this Relationship")
.build();
private volatile String scriptToRun = null;
private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>();
private final ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
private final List<PropertyDescriptor> descriptors = Arrays.asList(
RECORD_READER,
RECORD_WRITER,
LANGUAGE,
ScriptingComponentUtils.SCRIPT_BODY,
ScriptingComponentUtils.SCRIPT_FILE,
ScriptingComponentUtils.MODULES);
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
@OnScheduled
public void setup(final ProcessContext context) throws IOException {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
scriptingComponentHelper.setupVariables(context);
// Create a script engine for each possible task
final int maxTasks = context.getMaxConcurrentTasks();
scriptingComponentHelper.setup(maxTasks, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();
if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptingComponentHelper.getScriptPath())) {
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
if (scriptEngine == null) {
// This shouldn't happen. But just in case.
session.rollback();
return;
}
try {
final ScriptEvaluator evaluator;
try {
evaluator = createEvaluator(scriptEngine, flowFile);
} catch (final ScriptException se) {
getLogger().error("Failed to initialize script engine", se);
session.transfer(flowFile, REL_FAILURE);
return;
}
transform(flowFile, evaluator, context, session);
} finally {
scriptingComponentHelper.engineQ.offer(scriptEngine);
}
}
private void transform(final FlowFile flowFile, final ScriptEvaluator evaluator, final ProcessContext context, final ProcessSession session) {
final long startMillis = System.currentTimeMillis();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final AtomicLong recordIndex = new AtomicLong(0L);
final AtomicLong dropCount = new AtomicLong(0L);
try {
final Map<String, String> attributesToAdd = new HashMap<>();
// Read each record, transform it, and write out the transformed version
session.write(flowFile, (in, out) -> {
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), reader.getSchema(), out, flowFile)) {
writer.beginRecordSet();
Record inputRecord;
while ((inputRecord = reader.nextRecord()) != null) {
final long index = recordIndex.get();
// Evaluate the script against the Record
final Object returnValue = evaluator.evaluate(inputRecord, index);
recordIndex.getAndIncrement();
// If a null value was returned, drop the Record
if (returnValue == null) {
getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", new Object[]{index, inputRecord, flowFile});
dropCount.getAndIncrement();
continue;
}
// If a single Record was returned, write it out
if (returnValue instanceof Record) {
final Record transformedRecord = (Record) returnValue;
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[] {index, inputRecord, transformedRecord, flowFile});
writer.write(transformedRecord);
continue;
}
// If a Collection was returned, ensure that every element in the collection is a Record and write them out
if (returnValue instanceof Collection) {
final Collection<?> collection = (Collection<?>) returnValue;
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[] {index, inputRecord, collection, flowFile});
for (final Object element : collection) {
if (!(element instanceof Record)) {
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
+ " but instead of returning a Record or Collection of Records, script returned a Collection of values, "
+ "at least one of which was not a Record but instead was: " + returnValue);
}
writer.write((Record) element);
}
continue;
}
// Ensure that the value returned from the script is either null or a Record
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
+ " but instead of returning a Record, script returned a value of: " + returnValue);
}
// Add WriteResults to the attributes to be added to the FlowFile
final WriteResult writeResult = writer.finishRecordSet();
attributesToAdd.put("mime.type", writer.getMimeType());
attributesToAdd.putAll(writeResult.getAttributes());
attributesToAdd.put("record.count", String.valueOf(writeResult.getRecordCount()));
} catch (final MalformedRecordException | SchemaNotFoundException | ScriptException e) {
throw new ProcessException(e);
}
});
// Add the necessary attributes to the FlowFile and transfer to success
session.putAllAttributes(flowFile, attributesToAdd);
session.transfer(flowFile, REL_SUCCESS);
final long transformCount = recordIndex.get() - dropCount.get();
getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", new Object[] {transformCount, dropCount.get(), flowFile});
session.adjustCounter("Records Transformed", transformCount, true);
session.adjustCounter("Records Dropped", dropCount.get(), true);
final long millis = System.currentTimeMillis() - startMillis;
session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + dropCount.get() + " Records", millis);
} catch (final ProcessException e) {
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {recordIndex.get(), flowFile}, e.getCause());
session.transfer(flowFile, REL_FAILURE);
} catch (final Exception e) {
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {recordIndex.get(), flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
}
}
private ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, final FlowFile flowFile) throws ScriptException {
if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
final CompiledScript compiledScript = getOrCompileScript((Compilable) scriptEngine, scriptToRun);
return new PythonScriptEvaluator(scriptEngine, compiledScript, flowFile);
}
return new InterpretedScriptEvaluator(scriptEngine, scriptToRun, flowFile);
}
private CompiledScript getOrCompileScript(final Compilable scriptEngine, final String scriptToRun) throws ScriptException {
final CompiledScript existing = compiledScriptRef.get();
if (existing != null) {
return existing;
}
final CompiledScript compiled = scriptEngine.compile(scriptToRun);
final boolean updated = compiledScriptRef.compareAndSet(null, compiled);
if (updated) {
return compiled;
}
return compiledScriptRef.get();
}
private static Bindings setupBindings(final ScriptEngine scriptEngine) {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
if (bindings == null) {
bindings = new SimpleBindings();
}
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
return bindings;
}
@Override
public Collection<SearchResult> search(final SearchContext context) {
return ScriptingComponentUtils.search(context, getLogger());
}
private interface ScriptEvaluator {
Object evaluate(Record record, long index) throws ScriptException;
}
private class PythonScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final CompiledScript compiledScript;
private final Bindings bindings;
public PythonScriptEvaluator(final ScriptEngine scriptEngine, final CompiledScript compiledScript, final FlowFile flowFile) throws ScriptException {
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
// shows gains of about 100x better performance. But even with the compiled script, performance pales
// in comparison with Groovy.
this.compiledScript = compiledScript;
this.scriptEngine = scriptEngine;
this.bindings = setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", getLogger());
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
compiledScript.eval(bindings);
return scriptEngine.get("_");
}
}
private class InterpretedScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final String scriptToRun;
private final Bindings bindings;
public InterpretedScriptEvaluator(final ScriptEngine scriptEngine, final String scriptToRun, final FlowFile flowFile) {
this.scriptEngine = scriptEngine;
this.scriptToRun = scriptToRun;
this.bindings = setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", getLogger());
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
// Evaluate the script with the configurator (if it exists) or the engine
return scriptEngine.eval(scriptToRun, bindings);
}
}
}

View File

@ -16,10 +16,20 @@
*/
package org.apache.nifi.script;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@ -40,18 +50,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.util.StringUtils;
/**
* This class contains variables and methods common to scripting processors, reporting tasks, etc.
@ -290,7 +288,7 @@ public class ScriptingComponentHelper {
}
}
public void setupVariables(ProcessContext context) {
public void setupVariables(final PropertyContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
@ -302,17 +300,6 @@ public class ScriptingComponentHelper {
}
}
public void setupVariables(ConfigurationContext context) {
scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
scriptPath = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
scriptBody = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
modules = modulePath.split(",");
} else {
modules = new String[0];
}
}
/**
* Provides a ScriptEngine corresponding to the currently selected script engine name.

View File

@ -16,11 +16,27 @@
*/
package org.apache.nifi.script;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeSet;
/**
* Utility methods and constants used by the scripting components.
@ -64,5 +80,50 @@ public class ScriptingComponentUtils {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static Collection<SearchResult> search(final SearchContext context, final ComponentLog logger) {
final Collection<SearchResult> results = new ArrayList<>();
final String term = context.getSearchTerm();
final String scriptFile = context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue();
String script = context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue();
if (StringUtils.isBlank(script)) {
try {
script = IOUtils.toString(new FileInputStream(scriptFile), StandardCharsets.UTF_8);
} catch (Exception e) {
logger.error(String.format("Could not read from path %s", scriptFile), e);
return results;
}
}
final Scanner scanner = new Scanner(script);
int index = 1;
while (scanner.hasNextLine()) {
final String line = scanner.nextLine();
if (StringUtils.containsIgnoreCase(line, term)) {
final String text = String.format("Matched script at line %d: %s", index, line);
results.add(new SearchResult.Builder().label(text).match(term).build());
}
index++;
}
return results;
}
public static Set<String> getAvailableEngines() {
final ScriptEngineManager scriptEngineManager = new ScriptEngineManager();
final List<ScriptEngineFactory> scriptEngineFactories = scriptEngineManager.getEngineFactories();
final Set<String> engines = new TreeSet<>();
for (ScriptEngineFactory factory : scriptEngineFactories) {
engines.add(factory.getLanguageName());
}
return engines;
}
}

View File

@ -12,5 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.script.InvokeScriptedProcessor
org.apache.nifi.processors.script.ExecuteScript
org.apache.nifi.processors.script.ScriptedTransformRecord

View File

@ -0,0 +1,421 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ScriptedTransformRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>ScriptedTransformRecord</h1>
<h3>Description</h3>
<p>
The ScriptedTransformRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily update the contents of a Record.
NiFi provides several different Processors that can be used to manipulate Records in different ways. Each of these processors has its pros and cons. The ScriptedTransformRecord is perhaps
the most powerful and most versatile option. However, it is also the most error-prone, as it depends on writing custom scripts. It is also likely to yield the lowest performance,
as processors and libraries written directly in Java are likely to perform better than interpreted scripts.
</p>
<p>
When creating a script, it is important to note that, unlike ExecuteScript, this Processor does not allow the script itself to expose Properties to be configured or define Relationships.
This is a deliberate decision. If it is necessary to expose such configuration, the ExecuteScript processor should be used instead. By not exposing these elements,
the script avoids the need to define a Class or implement methods with a specific method signature. Instead, the script can avoid any boilerplate code and focus purely on the task
at hand.
</p>
<p>
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return a Record object
(See note below regarding <a href="#ReturnValue">Return Values</a>).
That Record is then written using the configured Record Writer. If the script returns a <code>null</code> value, the Record will not be written. If the script returns an object that is not
a Record, the incoming FlowFile will be routed to the <code>failure</code> relationship.
</p>
<p>
This processor maintains two Counters: "Records Transformed" indicating the number of Records that were passed to the script and for which the script returned a Record, and "Records Dropped"
indicating the number of Records that were passed to the script and for which the script returned a value of <code>null</code>.
</p>
<h3>Variable Bindings</h3>
<p>
While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information
that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made
available to the script:
</p>
<table>
<tr>
<th>Variable Name</th>
<th>Description</th>
<th>Variable Class</th>
</tr>
<tr>
<td>record</td>
<td>The Record that is to be transformed.</td>
<td><a href="https://javadoc.io/static/org.apache.nifi/nifi-record/1.11.4/org/apache/nifi/serialization/record/Record.html">Record</a></td>
</tr>
<tr>
<td>recordIndex</td>
<td>The zero-based index of the Record in the FlowFile.</td>
<td>Long (64-bit signed integer)</td>
</tr>
<tr>
<td>log</td>
<td>The Processor's Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any
log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN).</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/logging/ComponentLog.html">ComponentLog</a></td>
</tr>
<tr>
<td>attributes</td>
<td>Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable.
Any attempt to modify it will result in an UnsupportedOperationException being thrown.</td>
<td>java.util.Map</td>
</tr>
</table>
<a name="ReturnValue"></a>
<h3>Return Value</h3>
<p>
Each time that the script is invoked, it is expected to return a
<a href="https://javadoc.io/static/org.apache.nifi/nifi-record/1.11.4/org/apache/nifi/serialization/record/Record.html">Record</a> object or a Collection of Record objects.
Those Records are then written using the configured Record Writer. If the script returns a <code>null</code> value, the Record will not be written. If the script returns an object that is not
a Record or Collection of Records, the incoming FlowFile will be routed to the <code>failure</code> relationship.
</p>
<p>
Note that the Python language does not a script to use a <code>return</code> outside of a method. Additionally, when interpreted as a script,
the Java Python scripting engine does not provide a reliable way to easily obtain the last value referenced. As a result, any Python script must assign the value to be returned
to the <code>_</code> variable. See examples below.
</p>
<p>
The Record that is provided to the script is mutable. Therefore, it is a common pattern to update the <code>record</code> object in the script and simply return that same
<code>record</code> object.
</p>
<p>
<b>Note:</b> Depending on the scripting language, a script with no explicit return value may return <code>null</code> or may return the last value that was referenced.
Because returning <code>null</code> will result in dropping the Record and a non-Record return value will result in an Exception (and simply for the sake of clarity),
it is important to ensure that the configured script has an explicit return value.
</p>
<h3>Performance Considerations</h3>
<p>
NiFi offers many different processors for updating records in various ways. While each of these has its own pros and cons, performance is often an important consideration.
It is generally the case that standard processors, such as UpdateRecord, will perform better than script-oriented processors. However, this may not always be the case. For
situations when performance is critical, the best case is to test both approaches to see which performs best.
</p>
<p>
It is important to note also, though, that not all scripting languages and script engines are equal. For example, Groovy scripts will typically run much faster than Jython scripts.
However, if those in your organization are more familiar with Python than Java or Groovy, then using Jython may still make more sense.
</p>
<p>
A simple 5-minute benchmark was done to analyze the difference in performance. The script used simply modifies one field and return the Record otherwise unmodified.
The results are shown below. Note that no specifics are given with regards to hardware, specifically because the results should not be used to garner expectations of
absolute performance but rather to show relative performance between the different options.
</p>
<table>
<tr>
<th>Processor</th>
<th>Script Used</th>
<th>Records processed in 5 minutes</th>
</tr>
<tr>
<td>UpdateAttribute</td>
<td><i>No Script. User-defined Property added with name /num and value 42</i></td>
<td>50.1 million</td>
</tr>
<tr>
<td>ScriptedTransformRecord - Using Language: Groovy</td>
<td>
<pre><code>record.setValue("num", 42)
record
</code></pre>
</td>
<td>18.9 million</td>
</tr>
<tr>
<td>ScriptedTransformRecord - Using Language: python</td>
<td>
<pre><code>record.setValue("num", 42)
_ = record
</code></pre>
</td>
<td>21.0 million</td>
</tr>
<tr>
<td>ScriptedTransformRecord - Using Language: ruby</td>
<td>
<pre><code>record.setValue("num", 42)
record
</code></pre>
</td>
<td>2.67 million</td>
</tr>
</table>
<h2>Example Scripts</h2>
<h3>Remove First Record</h3>
<p>
The following script will remove the first Record from each FlowFile that it encounters.
</p>
<p>
Example Input (CSV):
</p>
<pre>
<code>
name, num
Mark, 42
Felicia, 3720
Monica, -3
</code>
</pre>
<p>
Example Output (CSV):
</p>
<pre>
<code>
name, num
Felicia, 3720
Monica, -3
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
return recordIndex == 0 ? null : record
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = None if (recordIndex == 0) else record
</code>
</pre>
<h3>Replace Field Value</h3>
<p>
The following script will replace any field in a Record if the value of that field is equal to the value of the "Value To Replace" attribute.
The value of that field will be replaced with whatever value is in the "Replacement Value" attribute.
</p>
<p>
Example Input Content (JSON):
</p>
<pre>
<code>
[{
"book": {
"author": "John Doe",
"date": "01/01/1980"
}
}, {
"book": {
"author": "Jane Doe",
"date": "01/01/1990"
}
}]
</code>
</pre>
<p>
Example Input Attributes:
</p>
<table>
<tr>
<th>Attribute Name</th>
<th>Attribute Value</th>
</tr>
<tr>
<td>Value To Replace</td>
<td>Jane Doe</td>
</tr>
<tr>
<td>Replacement Value</td>
<td>Author Unknown</td>
</tr>
</table>
<p>
Example Output (JSON):
</p>
<pre>
<code>
[{
"book": {
"author": "John Doe",
"date": "01/01/1980"
}
}, {
"book": {
"author": "Author Unknown",
"date": "01/01/1990"
}
}]
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
def replace(rec) {
rec.toMap().each { k, v ->
// If the field value is equal to the attribute 'Value to Replace', then set the
// field value to the 'Replacement Value' attribute.
if (v?.toString()?.equals(attributes['Value to Replace'])) {
rec.setValue(k, attributes['Replacement Value'])
}
// Call Recursively if the value is a Record
if (v instanceof org.apache.nifi.serialization.record.Record) {
replace(v)
}
}
}
replace(record)
return record
</code>
</pre>
<h3>Pass-through</h3>
<p>
The following script allows each Record to pass through without altering the Record in any way.
</p>
<p>
Example Input: &lt;any&gt;
</p>
<p>
Example output: &lt;identical to input&gt;
</p>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
record
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = record
</code>
</pre>
<h3>Fork Record</h3>
<p>The following script return each Record that it encounters, plus another Record, which is derived from the first, but where the 'num' field is one less than the 'num' field of the input.</p>
<p>
Example Input (CSV):
</p>
<pre>
<code>
name, num
Mark, 42
Felicia, 3720
Monica, -3
</code>
</pre>
<p>
Example Output (CSV):
</p>
<pre>
<code>
name, num
Mark, 42
Mark, 41
Felicia, 3720
Felicia, 3719
Monica, -3
Monica, -4
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
import org.apache.nifi.serialization.record.*
def derivedValues = new HashMap(record.toMap())
derivedValues.put('num', derivedValues['num'] - 1)
derived = new MapRecord(record.schema, derivedValues)
return [record, derived]
</code>
</pre>
</body>
</html>

View File

@ -0,0 +1,371 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.ArrayListRecordReader;
import org.apache.nifi.serialization.record.ArrayListRecordWriter;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
public class TestScriptedTransformRecord {
private TestRunner testRunner;
private ArrayListRecordReader recordReader;
private ArrayListRecordWriter recordWriter;
@Test
public void testSimpleGroovyScript() throws InitializationException {
testPassThrough("Groovy", "record");
}
@Test
public void testSimpleJythonScript() throws InitializationException {
testPassThrough("python", "_ = record");
}
@Test
public void testSimpleRubyScript() throws InitializationException {
testPassThrough("ruby", "record");
}
private void testPassThrough(final String language, final String script) throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, language);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, script);
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 3)));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
final List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i=0; i < 3; i++) {
assertEquals(i + 1, recordsWritten.get(i).getAsInt("num").intValue());
}
}
@Test
public void testCollectionOfRecords() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "[record, record, record]");
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 3)));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "9");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
final List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(9, recordsWritten.size());
int recordCounter = 0;
for (int i=0; i < 3; i++) {
for (int j=0; j < 3; j++) {
assertEquals(i + 1, recordsWritten.get(recordCounter++).getAsInt("num").intValue());
}
}
}
@Test
public void testCollectionOfRecordsWithModification() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/ForkRecordWithValueDecremented.groovy");
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, Collections.singletonMap("num", 3)));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "6");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
final List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(6, recordsWritten.size());
final int[] expectedNums = new int[] {1, 0, 2, 1, 3, 2};
for (int i=0; i < 6; i++) {
assertEquals(expectedNums[i], recordsWritten.get(i).getAsInt("num").intValue());
}
}
@Test
public void testScriptThrowsException() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/UpdateThenThrow.groovy");
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 3)));
final MockFlowFile inputFlowFile = testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_FAILURE, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_FAILURE).get(0);
out.assertAttributeNotExists("record.count");
assertNull(testRunner.getCounterValue("Records Transformed"));
assertNull(testRunner.getCounterValue("Records Dropped"));
assertSame(inputFlowFile, out);
}
private Map<String, Object> mutableMap(final String key, final Object value) {
final Map<String, Object> mutable = new HashMap<>();
mutable.put(key, value);
return mutable;
}
@Test
public void testScriptTransformsRecord() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "record.setValue('i', recordIndex); record");
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 3)));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
final List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i=0; i < 3; i++) {
assertEquals(i + 1, recordsWritten.get(i).getAsInt("num").intValue());
assertEquals(i, recordsWritten.get(i).getAsInt("i").intValue());
}
}
@Test
public void testScriptReturnsNull() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "if (record.getAsInt('num') % 2 == 0) { return record; } else { return null; }");
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 3)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 4)));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "2");
assertEquals(2, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(2, testRunner.getCounterValue("Records Dropped").intValue());
final List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(2, recordsWritten.size());
assertEquals(2, recordsWritten.get(0).getAsInt("num").intValue());
assertEquals(4, recordsWritten.get(1).getAsInt("num").intValue());
}
@Test
public void testScriptReturnsWrongObject() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "if (recordIndex == 0) { return record; } else { return 88; }");
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 3)));
final MockFlowFile inputFlowFile = testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_FAILURE, 1);
assertEquals(1, recordWriter.getRecordsWritten().size());
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_FAILURE).get(0);
out.assertAttributeNotExists("record.count");
assertNull(testRunner.getCounterValue("Records Transformed"));
assertNull(testRunner.getCounterValue("Records Dropped"));
assertSame(inputFlowFile, out);
}
@Test
public void testScriptReturnsCollectionWithWrongObject() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "[record, record, 88, record]");
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 1)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 2)));
recordReader.addRecord(new MapRecord(schema, mutableMap("num", 3)));
final MockFlowFile inputFlowFile = testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_FAILURE, 1);
assertEquals(2, recordWriter.getRecordsWritten().size());
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_FAILURE).get(0);
out.assertAttributeNotExists("record.count");
assertNull(testRunner.getCounterValue("Records Transformed"));
assertNull(testRunner.getCounterValue("Records Dropped"));
assertSame(inputFlowFile, out);
}
@Test
public void testScriptWithFunctions() throws InitializationException {
final List<RecordField> bookFields = new ArrayList<>();
bookFields.add(new RecordField("author", RecordFieldType.STRING.getDataType()));
bookFields.add(new RecordField("date", RecordFieldType.STRING.getDataType()));
final RecordSchema bookSchema = new SimpleRecordSchema(bookFields);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("book", RecordFieldType.RECORD.getRecordDataType(bookSchema)));
final RecordSchema outerSchema = new SimpleRecordSchema(fields);
setup(outerSchema);
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/ReplaceFieldValue.groovy");
recordReader.addRecord(createBook("John Doe", "01/01/1980", bookSchema, outerSchema));
recordReader.addRecord(createBook("Jane Doe", "01/01/1990", bookSchema, outerSchema));
final Map<String, String> attributes = new HashMap<>();
attributes.put("Value to Replace", "Jane Doe");
attributes.put("Replacement Value", "Unknown Author");
testRunner.enqueue(new byte[0], attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
final MockFlowFile output = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
output.assertAttributeEquals("record.count", "2");
final List<Record> outputRecords = recordWriter.getRecordsWritten();
assertEquals("John Doe", outputRecords.get(0).getAsRecord("book", bookSchema).getValue("author"));
assertEquals("Unknown Author", outputRecords.get(1).getAsRecord("book", bookSchema).getValue("author"));
}
private Record createBook(final String author, final String date, final RecordSchema bookSchema, final RecordSchema outerSchema) {
final Map<String, Object> firstBookValues = new HashMap<>();
firstBookValues.put("author", author);
firstBookValues.put("date", date);
final Record firstBookRecord = new MapRecord(bookSchema, firstBookValues);
final Map<String, Object> book1ValueMap = mutableMap("book", firstBookRecord);
return new MapRecord(outerSchema, book1ValueMap);
}
private void setup(final RecordSchema schema) throws InitializationException {
testRunner = TestRunners.newTestRunner(ScriptedTransformRecord.class);
testRunner.setProperty(ScriptedTransformRecord.RECORD_READER, "record-reader");
testRunner.setProperty(ScriptedTransformRecord.RECORD_WRITER, "record-writer");
recordReader = new ArrayListRecordReader(schema);
recordWriter = new ArrayListRecordWriter(schema);
testRunner.addControllerService("record-reader", recordReader);
testRunner.addControllerService("record-writer", recordWriter);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "record"); // return the input
testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, "Groovy");
testRunner.enableControllerService(recordReader);
testRunner.enableControllerService(recordWriter);
}
private RecordSchema createSimpleNumberSchema() {
final RecordField recordField = new RecordField("num", RecordFieldType.INT.getDataType());
final List<RecordField> recordFields = Collections.singletonList(recordField);
final RecordSchema schema = new SimpleRecordSchema(recordFields);
return schema;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.serialization.record.*
def derivedValues = new HashMap(record.toMap())
derivedValues.put('num', derivedValues['num'] - 1)
derived = new MapRecord(record.schema, derivedValues)
return [record, derived]

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def replace(rec) {
rec.toMap().each { k, v ->
// If the field value is equal to the attribute 'Value to Replace', then set the
// field value to the 'Replacement Value' attribute.
if (v?.toString()?.equals(attributes['Value to Replace'])) {
rec.setValue(k, attributes['Replacement Value'])
}
// Call Recursively if the value is a Record
if (v instanceof org.apache.nifi.serialization.record.Record) {
replace(v)
}
}
}
replace(record)
return record

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
if (recordIndex == 0) {
record.setValue("num", 8);
} else {
throw new RuntimeException("Intentional Unit Test Exception");
}