NIFI-8273 Adding Scripted Record processors

This commit is contained in:
Bence Simon 2021-03-29 17:44:17 +02:00 committed by Mark Payne
parent 4a5fe698f7
commit db5b618550
20 changed files with 2334 additions and 218 deletions

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.Record;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
class InterpretedScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final String scriptToRun;
private final Bindings bindings;
InterpretedScriptEvaluator(final ScriptEngine scriptEngine, final String scriptToRun, final FlowFile flowFile, final ComponentLog logger) {
this.scriptEngine = scriptEngine;
this.scriptToRun = scriptToRun;
this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", logger);
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
// Evaluate the script with the configurator (if it exists) or the engine
return scriptEngine.eval(scriptToRun, bindings);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.record.Record;
import javax.script.Bindings;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
class PythonScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final CompiledScript compiledScript;
private final Bindings bindings;
PythonScriptEvaluator(
final ScriptEngine scriptEngine,
final CompiledScript compiledScript,
final FlowFile flowFile,
final ComponentLog componentLog
) {
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
// shows gains of about 100x better performance. But even with the compiled script, performance pales
// in comparison with Groovy.
this.compiledScript = compiledScript;
this.scriptEngine = scriptEngine;
this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", componentLog);
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
compiledScript.eval(bindings);
return scriptEngine.get("_");
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
class RecordCounts {
private long recordCount;
private long droppedCount;
public long getRecordCount() {
return recordCount;
}
public long getDroppedCount() {
return droppedCount;
}
public void incrementRecordCount() {
recordCount++;
}
public void incrementDroppedCount() {
droppedCount++;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.serialization.record.Record;
import javax.script.ScriptException;
/**
* Used by scripted record processors to enclose script engines for different languages.
*/
interface ScriptEvaluator {
/**
* Evaluates the enclosed script using the record as argument. Returns with the script's return value.
*
* @param record The script to evaluate.
* @param index The index of the record.
*
* @return The return value of the evaluated script.
*
* @throws ScriptException In case of issues with the evaluations.
*/
Object evaluate(Record record, long index) throws ScriptException;
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.Relationship;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@Tags({"record", "filter", "script", "groovy", "jython", "python"})
@CapabilityDescription(
"This processor provides the ability to filter records out from FlowFiles using the user-provided script. " +
"Every record will be evaluated by the script which must return with a boolean value. " +
"Records with \"true\" result will be routed to the \"matching\" relationship in a batch. " +
"Other records will be filtered out."
)
@SeeAlso(classNames = {
"org.apache.nifi.processors.script.ScriptedTransformRecord",
"org.apache.nifi.processors.script.ScriptedValidateRecord",
"org.apache.nifi.processors.script.ScriptedPartitionRecord"
})
public class ScriptedFilterRecord extends ScriptedRouterProcessor<Boolean> {
static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"Matching records of the original FlowFile will be routed to this relationship. " +
"If there are no matching records, no FlowFile will be routed here."
)
.build();
static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder()
.name("original")
.description(
"After successful procession, the incoming FlowFile will be transferred to this relationship. " +
"This happens regardless the number of filtered or remaining records.")
.build();
static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("In case of any issue during processing the incoming FlowFile, the incoming FlowFile will be routed to this relationship.")
.build();
private static Set<Relationship> RELATIONSHIPS = new HashSet<>();
static {
RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
}
public ScriptedFilterRecord() {
super(Boolean.class);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Relationship getOriginalRelationship() {
return RELATIONSHIP_ORIGINAL;
}
@Override
protected Relationship getFailureRelationship() {
return RELATIONSHIP_FAILURE;
}
@Override
protected Optional<Relationship> resolveRelationship(final Boolean scriptResult) {
return scriptResult ? Optional.of(RELATIONSHIP_SUCCESS) : Optional.empty();
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@SideEffectFree
@Tags({"record", "partition", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"})
@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates the user provided script against "
+ "each record in the incoming flow file. Each record is then grouped with other records sharing the same partition and a FlowFile is created for each groups of records. " +
"Two records shares the same partition if the evaluation of the script results the same return value for both. Those will be considered as part of the same partition.")
@Restricted(restrictions = {
@Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
})
@WritesAttributes({
@WritesAttribute(attribute = "partition", description = "The partition of the outgoing flow file."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer."),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of partitioned FlowFiles generated from the parent FlowFile")
})
@SeeAlso(classNames = {
"org.apache.nifi.processors.script.ScriptedTransformRecord",
"org.apache.nifi.processors.script.ScriptedValidateRecord",
"org.apache.nifi.processors.script.ScriptedFilterRecord"
})
public class ScriptedPartitionRecord extends ScriptedRecordProcessor {
static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are successfully partitioned will be routed to this relationship")
.build();
static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder()
.name("original")
.description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.")
.build();
static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, "
+ "the unchanged FlowFile will be routed to this relationship")
.build();
private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
static {
RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
RELATIONSHIPS.add(RELATIONSHIP_SUCCESS);
RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ScriptRunner scriptRunner = pollScriptRunner();
boolean success = false;
try {
final ScriptEvaluator evaluator;
try {
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
evaluator = createEvaluator(scriptEngine, flowFile);
} catch (final ScriptException se) {
getLogger().error("Failed to initialize script engine", se);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
return;
}
success = partition(context, session, flowFile, evaluator);
} finally {
offerScriptRunner(scriptRunner);
}
session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : RELATIONSHIP_FAILURE);
}
private boolean partition(
final ProcessContext context,
final ProcessSession session,
final FlowFile incomingFlowFile,
final ScriptEvaluator evaluator
) {
final long startMillis = System.currentTimeMillis();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Map<String, String> originalAttributes = incomingFlowFile.getAttributes();
final RecordCounts counts = new RecordCounts();
try {
session.read(incomingFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger())
) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
final Map<String, FlowFile> outgoingFlowFiles = new HashMap<>();
final Map<String, RecordSetWriter> recordSetWriters = new HashMap<>();
// Reading in records and evaluate script
while (pushBackSet.isAnotherRecord()) {
final Record record = pushBackSet.next();
final Object evaluatedValue = evaluator.evaluate(record, counts.getRecordCount());
getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, counts.getRecordCount(), evaluatedValue);
counts.incrementRecordCount();
final String partition = (evaluatedValue == null) ? null : evaluatedValue.toString();
RecordSetWriter writer = recordSetWriters.get(partition);
if (writer == null) {
final FlowFile outgoingFlowFile = session.create(incomingFlowFile);
final OutputStream out = session.write(outgoingFlowFile);
writer = writerFactory.createWriter(getLogger(), schema, out, outgoingFlowFile);
writer.beginRecordSet();
outgoingFlowFiles.put(partition, outgoingFlowFile);
recordSetWriters.put(partition, writer);
}
writer.write(record);
}
// Sending outgoing flow files
int fragmentIndex = 0;
for (final String partition : outgoingFlowFiles.keySet()) {
final RecordSetWriter writer = recordSetWriters.get(partition);
final FlowFile outgoingFlowFile = outgoingFlowFiles.get(partition);
final Map<String, String> attributes = new HashMap<>(incomingFlowFile.getAttributes());
attributes.put("mime.type", writer.getMimeType());
attributes.put("partition", partition);
attributes.put("fragment.index", String.valueOf(fragmentIndex));
attributes.put("fragment.count", String.valueOf(outgoingFlowFiles.size()));
try {
final WriteResult finalResult = writer.finishRecordSet();
final int outgoingFlowFileRecords = finalResult.getRecordCount();
attributes.put("record.count", String.valueOf(outgoingFlowFileRecords));
writer.close();
} catch (final IOException e) {
throw new ProcessException("Resources used for record writing might not be closed", e);
}
session.putAllAttributes(outgoingFlowFile, attributes);
session.transfer(outgoingFlowFile, RELATIONSHIP_SUCCESS);
fragmentIndex++;
}
final long millis = System.currentTimeMillis() - startMillis;
session.adjustCounter("Records Processed", counts.getRecordCount(), true);
session.getProvenanceReporter().fork(incomingFlowFile, outgoingFlowFiles.values(), "Processed " + counts.getRecordCount() + " Records", millis);
} catch (final ScriptException | SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("After processing " + counts.getRecordCount() + " Records, encountered failure when attempting to process " + incomingFlowFile, e);
}
}
});
return true;
} catch (final Exception e) {
getLogger().error("Failed to partition records due to: " + e.getMessage(), e);
return false;
}
}
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
abstract class ScriptedRecordProcessor extends AbstractProcessor implements Searchable {
protected static final String PYTHON_SCRIPT_LANGUAGE = "python";
protected static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
protected volatile String scriptToRun = null;
protected final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>();
private final ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("The Record Reader to use parsing the incoming FlowFile into Records")
.required(true)
.identifiesControllerService(RecordReaderFactory.class)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use for serializing Records after they have been transformed")
.required(true)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
.name("Script Engine")
.displayName("Script Language")
.description("The Language to use for the script")
.allowableValues(SCRIPT_OPTIONS)
.defaultValue("Groovy")
.required(true)
.build();
protected static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
RECORD_READER,
RECORD_WRITER,
LANGUAGE,
ScriptingComponentUtils.SCRIPT_BODY,
ScriptingComponentUtils.SCRIPT_FILE,
ScriptingComponentUtils.MODULES);
@OnScheduled
public void setup(final ProcessContext context) throws IOException {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources(false);
}
scriptingComponentHelper.setupVariables(context);
scriptToRun = scriptingComponentHelper.getScriptBody();
if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptingComponentHelper.getScriptPath())) {
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
// Create a script runner for each possible task
final int maxTasks = context.getMaxConcurrentTasks();
scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger());
// Always compile when first run
compiledScriptRef.set(null);
}
protected ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, final FlowFile flowFile) throws ScriptException {
if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
final CompiledScript compiledScript = getOrCompileScript((Compilable) scriptEngine, scriptToRun);
return new PythonScriptEvaluator(scriptEngine, compiledScript, flowFile, getLogger());
}
return new InterpretedScriptEvaluator(scriptEngine, scriptToRun, flowFile, getLogger());
}
private CompiledScript getOrCompileScript(final Compilable scriptEngine, final String scriptToRun) throws ScriptException {
final CompiledScript existing = compiledScriptRef.get();
if (existing != null) {
return existing;
}
final CompiledScript compiled = scriptEngine.compile(scriptToRun);
final boolean updated = compiledScriptRef.compareAndSet(null, compiled);
if (updated) {
return compiled;
}
return compiledScriptRef.get();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
@Override
public Collection<SearchResult> search(final SearchContext context) {
return ScriptingComponentUtils.search(context, getLogger());
}
protected static Bindings setupBindings(final ScriptEngine scriptEngine) {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
if (bindings == null) {
bindings = new SimpleBindings();
}
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
return bindings;
}
protected ScriptRunner pollScriptRunner() {
final ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
// This shouldn't happen. But just in case.
if (scriptRunner == null) {
throw new ProcessException("Could not acquire script runner!");
}
return scriptRunner;
}
protected void offerScriptRunner(ScriptRunner scriptRunner) {
scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
}
}

View File

@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@EventDriven
@SupportsBatching
@SideEffectFree
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Restricted(restrictions = {
@Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
})
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records within the flow file."),
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
})
public abstract class ScriptedRouterProcessor<T> extends ScriptedRecordProcessor {
private final Class<T> scriptResultType;
/**
* @param scriptResultType Defines the expected result type of the user-provided script.
*/
protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
this.scriptResultType = scriptResultType;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ScriptRunner scriptRunner = pollScriptRunner();
boolean success = false;
try {
final ScriptEvaluator evaluator;
try {
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
evaluator = createEvaluator(scriptEngine, flowFile);
} catch (final ScriptException se) {
getLogger().error("Failed to initialize script engine", se);
session.transfer(flowFile, getFailureRelationship());
return;
}
success = route(context, session, flowFile, evaluator);
} finally {
offerScriptRunner(scriptRunner);
}
session.transfer(flowFile, success ? getOriginalRelationship() : getFailureRelationship());
}
private boolean route(
final ProcessContext context,
final ProcessSession session,
final FlowFile incomingFlowFile,
final ScriptEvaluator evaluator
) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Map<String, String> originalAttributes = incomingFlowFile.getAttributes();
final RecordCounts counts = new RecordCounts();
try {
session.read(incomingFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger())
) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
final Map<Relationship, FlowFile> outgoingFlowFiles = new HashMap<>();
final Map<Relationship, RecordSetWriter> recordSetWriters = new HashMap<>();
// Reading in records and evaluate script
while (pushBackSet.isAnotherRecord()) {
final Record record = pushBackSet.next();
final Object evaluatedValue = evaluator.evaluate(record, counts.getRecordCount());
getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, counts.getRecordCount(), evaluatedValue);
counts.incrementRecordCount();
if (evaluatedValue != null && scriptResultType.isInstance(evaluatedValue)) {
final Optional<Relationship> outgoingRelationship = resolveRelationship(scriptResultType.cast(evaluatedValue));
if (outgoingRelationship.isPresent()) {
if (!outgoingFlowFiles.containsKey(outgoingRelationship.get())) {
final FlowFile outgoingFlowFile = session.create(incomingFlowFile);
final OutputStream out = session.write(outgoingFlowFile);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, outgoingFlowFile);
writer.beginRecordSet();
outgoingFlowFiles.put(outgoingRelationship.get(), outgoingFlowFile);
recordSetWriters.put(outgoingRelationship.get(), writer);
}
recordSetWriters.get(outgoingRelationship.get()).write(record);
} else {
getLogger().debug("Record with evaluated value {} has no outgoing relationship determined", String.valueOf(evaluatedValue));
}
} else {
throw new ProcessException("Script returned a value of " + evaluatedValue
+ " but this Processor requires that the object returned be an instance of " + scriptResultType.getSimpleName());
}
}
// Sending outgoing flow files
for (final Relationship relationship : outgoingFlowFiles.keySet()) {
final RecordSetWriter writer = recordSetWriters.get(relationship);
final FlowFile outgoingFlowFile = outgoingFlowFiles.get(relationship);
final Map<String, String> attributes = new HashMap<>(incomingFlowFile.getAttributes());
attributes.put("mime.type", writer.getMimeType());
try {
final WriteResult finalResult = writer.finishRecordSet();
final int outgoingFlowFileRecords = finalResult.getRecordCount();
attributes.put("record.count", String.valueOf(outgoingFlowFileRecords));
writer.close();
} catch (final IOException e) {
throw new ProcessException("Resources used for record writing might not be closed", e);
}
session.putAllAttributes(outgoingFlowFile, attributes);
session.transfer(outgoingFlowFile, relationship);
}
} catch (final ScriptException | SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("After processing " + counts.getRecordCount() + " Records, encountered failure when attempting to process " + incomingFlowFile, e);
}
}
});
session.adjustCounter("Records Processed", counts.getRecordCount(), true);
return true;
} catch (final Exception e) {
getLogger().error("Failed to route records due to: " + e.getMessage(), e);
return false;
}
}
/**
* @return Returns with the relationship used for route the incoming FlowFile in case of successful processing.
*/
protected abstract Relationship getOriginalRelationship();
/**
* @return Returns with the relationship used for route the incoming FlowFile in case of unsuccessful processing.
*/
protected abstract Relationship getFailureRelationship();
/**
* Returns a relationship based on the script's result value. As the script uses a given record as input, this helps
* to dissolve the result value for the routing.
*
* @param scriptResult The value returned by the script.
*
* @return Returns with a relationship if there is one to determine based on the value. If it is not possible to determine
* an {code Optional#empty} is expected. Records with empty relationship will not be routed into any relationship (except for
* the original or failed).
*/
protected abstract Optional<Relationship> resolveRelationship(final T scriptResult);
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
@ -29,24 +28,14 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -56,17 +45,9 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -97,32 +78,7 @@ import java.util.concurrent.atomic.AtomicReference;
"org.apache.nifi.processors.standard.QueryRecord",
"org.apache.nifi.processors.jolt.record.JoltTransformRecord",
"org.apache.nifi.processors.standard.LookupRecord"})
public class ScriptedTransformRecord extends AbstractProcessor implements Searchable {
private static final String PYTHON_SCRIPT_LANGUAGE = "python";
private static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
static final PropertyDescriptor RECORD_READER = new Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("The Record Reader to use parsing the incoming FlowFile into Records")
.required(true)
.identifiesControllerService(RecordReaderFactory.class)
.build();
static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use for serializing Records after they have been transformed")
.required(true)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
static final PropertyDescriptor LANGUAGE = new Builder()
.name("Script Engine")
.displayName("Script Language")
.description("The Language to use for the script")
.allowableValues(SCRIPT_OPTIONS)
.defaultValue("Groovy")
.required(true)
.build();
public class ScriptedTransformRecord extends ScriptedRecordProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -134,19 +90,6 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
.description("Any FlowFile that cannot be transformed will be routed to this Relationship")
.build();
private volatile String scriptToRun = null;
private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>();
private final ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
private final List<PropertyDescriptor> descriptors = Arrays.asList(
RECORD_READER,
RECORD_WRITER,
LANGUAGE,
ScriptingComponentUtils.SCRIPT_BODY,
ScriptingComponentUtils.SCRIPT_FILE,
ScriptingComponentUtils.MODULES);
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
@ -157,38 +100,9 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
return DESCRIPTORS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
@OnScheduled
public void setup(final ProcessContext context) throws IOException {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources(false);
}
scriptingComponentHelper.setupVariables(context);
scriptToRun = scriptingComponentHelper.getScriptBody();
if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) {
try (final FileInputStream scriptStream = new FileInputStream(scriptingComponentHelper.getScriptPath())) {
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
// Create a script runner for each possible task
final int maxTasks = context.getMaxConcurrentTasks();
scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger());
// Always compile when first run
compiledScriptRef.set(null);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
@ -196,17 +110,12 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
return;
}
final ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
if (scriptRunner == null) {
// This shouldn't happen. But just in case.
session.rollback();
return;
}
final ScriptRunner scriptRunner = pollScriptRunner();
try {
final ScriptEvaluator evaluator;
try {
ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
evaluator = createEvaluator(scriptEngine, flowFile);
} catch (final ScriptException se) {
getLogger().error("Failed to initialize script engine", se);
@ -216,7 +125,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
transform(flowFile, evaluator, context, session);
} finally {
scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
offerScriptRunner(scriptRunner);
}
}
@ -226,7 +135,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Counts counts = new Counts();
final RecordCounts counts = new RecordCounts();
try {
final Map<String, String> attributesToAdd = new HashMap<>();
@ -326,7 +235,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
}
}
private void processRecord(final Record inputRecord, final FlowFile flowFile, final Counts counts, final RecordWriteAction recordWriteAction,
private void processRecord(final Record inputRecord, final FlowFile flowFile, final RecordCounts counts, final RecordWriteAction recordWriteAction,
final ScriptEvaluator evaluator) throws IOException, ScriptException {
final long index = counts.getRecordCount();
@ -373,125 +282,6 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
+ " but instead of returning a Record, script returned a value of: " + returnValue);
}
private ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, final FlowFile flowFile) throws ScriptException {
if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
final CompiledScript compiledScript = getOrCompileScript((Compilable) scriptEngine, scriptToRun);
return new PythonScriptEvaluator(scriptEngine, compiledScript, flowFile);
}
return new InterpretedScriptEvaluator(scriptEngine, scriptToRun, flowFile);
}
private CompiledScript getOrCompileScript(final Compilable scriptEngine, final String scriptToRun) throws ScriptException {
final CompiledScript existing = compiledScriptRef.get();
if (existing != null) {
return existing;
}
final CompiledScript compiled = scriptEngine.compile(scriptToRun);
final boolean updated = compiledScriptRef.compareAndSet(null, compiled);
if (updated) {
return compiled;
}
return compiledScriptRef.get();
}
private static Bindings setupBindings(final ScriptEngine scriptEngine) {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
if (bindings == null) {
bindings = new SimpleBindings();
}
scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
return bindings;
}
@Override
public Collection<SearchResult> search(final SearchContext context) {
return ScriptingComponentUtils.search(context, getLogger());
}
private interface ScriptEvaluator {
Object evaluate(Record record, long index) throws ScriptException;
}
private class PythonScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final CompiledScript compiledScript;
private final Bindings bindings;
public PythonScriptEvaluator(final ScriptEngine scriptEngine, final CompiledScript compiledScript, final FlowFile flowFile) {
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
// shows gains of about 100x better performance. But even with the compiled script, performance pales
// in comparison with Groovy.
this.compiledScript = compiledScript;
this.scriptEngine = scriptEngine;
this.bindings = setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", getLogger());
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
compiledScript.eval(bindings);
return scriptEngine.get("_");
}
}
private class InterpretedScriptEvaluator implements ScriptEvaluator {
private final ScriptEngine scriptEngine;
private final String scriptToRun;
private final Bindings bindings;
public InterpretedScriptEvaluator(final ScriptEngine scriptEngine, final String scriptToRun, final FlowFile flowFile) {
this.scriptEngine = scriptEngine;
this.scriptToRun = scriptToRun;
this.bindings = setupBindings(scriptEngine);
bindings.put("attributes", flowFile.getAttributes());
bindings.put("log", getLogger());
}
@Override
public Object evaluate(final Record record, final long index) throws ScriptException {
bindings.put("record", record);
bindings.put("recordIndex", index);
// Evaluate the script with the configurator (if it exists) or the engine
return scriptEngine.eval(scriptToRun, bindings);
}
}
private static class Counts {
private long recordCount;
private long droppedCount;
public long getRecordCount() {
return recordCount;
}
public long getDroppedCount() {
return droppedCount;
}
public void incrementRecordCount() {
recordCount++;
}
public void incrementDroppedCount() {
droppedCount++;
}
}
private interface RecordWriteAction {
void write(Record record) throws IOException;
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.Relationship;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@Tags({"record", "validate", "script", "groovy", "jython", "python"})
@CapabilityDescription(
"This processor provides the ability to validate records in FlowFiles using the user-provided script. " +
"The script is expected to have a record as incoming argument and return with a boolean value. " +
"Based on this result, the processor categorizes the records as \"valid\" or \"invalid\" and routes them to the respective relationship in batch. " +
"Additionally the original FlowFile will be routed to the \"original\" relationship or in case of unsuccessful processing, to the \"failed\" relationship."
)
@SeeAlso(classNames = {
"org.apache.nifi.processors.script.ScriptedTransformRecord",
"org.apache.nifi.processors.script.ScriptedFilterRecord",
"org.apache.nifi.processors.script.ScriptedPartitionRecord"
})
public class ScriptedValidateRecord extends ScriptedRouterProcessor<Boolean> {
static final Relationship RELATIONSHIP_VALID = new Relationship.Builder()
.name("valid")
.description(
"FlowFile containing the valid records from the incoming FlowFile will be routed to this relationship. " +
"If there are no valid records, no FlowFile will be routed to this Relationship.")
.build();
static final Relationship RELATIONSHIP_INVALID = new Relationship.Builder()
.name("invalid")
.description(
"FlowFile containing the invalid records from the incoming FlowFile will be routed to this relationship. " +
"If there are no invalid records, no FlowFile will be routed to this Relationship.")
.build();
static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder()
.name("original")
.description(
"After successful procession, the incoming FlowFile will be transferred to this relationship. " +
"This happens regardless the FlowFiles might routed to \"valid\" and \"invalid\" relationships.")
.build();
static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("In case of any issue during processing the incoming flow file, the incoming FlowFile will be routed to this relationship.")
.build();
private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
static {
RELATIONSHIPS.add(RELATIONSHIP_VALID);
RELATIONSHIPS.add(RELATIONSHIP_INVALID);
RELATIONSHIPS.add(RELATIONSHIP_ORIGINAL);
RELATIONSHIPS.add(RELATIONSHIP_FAILURE);
}
public ScriptedValidateRecord() {
super(Boolean.class);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Relationship getOriginalRelationship() {
return RELATIONSHIP_ORIGINAL;
}
@Override
protected Relationship getFailureRelationship() {
return RELATIONSHIP_FAILURE;
}
@Override
protected Optional<Relationship> resolveRelationship(final Boolean scriptResult) {
return Optional.of(scriptResult ? RELATIONSHIP_VALID : RELATIONSHIP_INVALID);
}
}

View File

@ -15,4 +15,7 @@
org.apache.nifi.processors.script.InvokeScriptedProcessor
org.apache.nifi.processors.script.ExecuteScript
org.apache.nifi.processors.script.ScriptedTransformRecord
org.apache.nifi.processors.script.ScriptedTransformRecord
org.apache.nifi.processors.script.ScriptedValidateRecord
org.apache.nifi.processors.script.ScriptedFilterRecord
org.apache.nifi.processors.script.ScriptedPartitionRecord

View File

@ -0,0 +1,231 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ScriptedFilterRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>ScriptedFilterRecord</h1>
<h3>Description</h3>
<p>
The ScriptedFilterRecord Processor provides the ability to use a scripting language, such as Groovy or Jyton in order to remove Records from an incoming FlowFile.
NiFi provides several different Processors that can be used to work with Records in different ways. Each of these processors has its pros and cons.
The ScriptedFilterRecord is intended to work together with these processors and be used as a pre-processing step before processing the FlowFile with more performance consuming Processors, like ScriptedTransformRecord.
</p>
<p>
The Processor expects an user defined script in order to determine which Records should be kept and filtered out.
When creating a script, it is important to note that, unlike ExecuteScript, this Processor does not allow the script itself to expose Properties to be configured or define Relationships.
</p>
<p>
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return a <code>boolean</code> value, which is used as a basis of filtering:
For Records the script returns with a <code>true</code> value, the given Record will be included to the outgoing FlowFile which will be routed to the <code>success</code> Relationship.
For <code>false</code> values the given Record will not be added to the output.
In addition to this the incoming FlowFile will be transferred to the <code>original</code> Relationship without change.
If the script returns an object that is not considered as <code>boolean</code>, the incoming FlowFile will be routed to the <code>failure</code> Relationship instead and no FlowFile will be routed to the <code>success</code> Relationship.
</p>
<p>
This Processor maintains a Counter: "Records Processed" indicating the number of Records that were passed to the script regardless of the result of the filtering.
</p>
<h3>Variable Bindings</h3>
<p>
While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information
that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made
available to the script:
</p>
<table>
<tr>
<th>Variable Name</th>
<th>Description</th>
<th>Variable Class</th>
</tr>
<tr>
<td>record</td>
<td>The Record that is to be processed.</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-record/latest/org/apache/nifi/serialization/record/Record.html">Record</a></td>
</tr>
<tr>
<td>recordIndex</td>
<td>The zero-based index of the Record in the FlowFile.</td>
<td>Long (64-bit signed integer)</td>
</tr>
<tr>
<td>log</td>
<td>The Processor's Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any
log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN).</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/logging/ComponentLog.html">ComponentLog</a></td>
</tr>
<tr>
<td>attributes</td>
<td>Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable.
Any attempt to modify it will result in an UnsupportedOperationException being thrown.</td>
<td>java.util.Map</td>
</tr>
</table>
<h3>Return Value</h3>
<p>
Each time the script is invoked, it is expected to return a <code>boolean</code> value. Return values other than <code>boolean</code>, including <code>null</code> value will be handled as
unexpected script behaviour and handled accordingly: the processing will be interrupted and the incoming FlowFile will be transferred to the <code>failure</code> relationship without further execution.
</p>
<h2>Example Scripts</h2>
<h3>Filtering based on position</h3>
<p>
The following script will keep only the first 2 Records from a FlowFile and filter out all the rest.
</p>
<p>
Example Input (CSV):
</p>
<pre>
<code>
name, allyOf
Decelea, Athens
Corinth, Sparta
Mycenae, Sparta
Potidaea, Athens
</code>
</pre>
<p>
Example Output (CSV):
</p>
<pre>
<code>
name, allyOf
Decelea, Athens
Corinth, Sparta
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
return recordIndex < 2 ? true : false
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = True if (recordIndex < 2) else False
</code>
</pre>
<h3>Filtering based on Record contents</h3>
<p>
The following script will filter the Records based on their content. Any Records satisfies the condition will be part of the FlowFile routed to the <code>success</code> Relationship.
</p>
<p>
Example Input (JSON):
</p>
<pre>
<code>
[
{
"city": "Decelea",
"allyOf": "Athens"
}, {
"city": "Corinth",
"allyOf": "Sparta"
}, {
"city": "Mycenae",
"allyOf": "Sparta"
}, {
"city": "Potidaea",
"allyOf": "Athens"
}
]
</code>
</pre>
<p>
Example Output (CSV):
</p>
<pre>
<code>
[
{
"city": "Decelea",
"allyOf": "Athens"
}, {
"city": "Potidaea",
"allyOf": "Athens"
}
]
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
if (record.getValue("allyOf") == "Athens") {
return true;
} else {
return false;
}
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
allyOf = record.getValue("allyOf")
_ = True if (allyOf == "Athens") else False
</code>
</pre>
</body>
</html>

View File

@ -0,0 +1,177 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ScriptedPartitionRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>ScriptedPartitionRecord</h1>
<h3>Description</h3>
<p>
The ScriptedPartitionRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily partition a Record based on the contents of it.
There are multiple ways to reach the same behaviour such as using PartitionRecord but working with user provided scripts opens a wide range of possibilities on the decision
logic of partitioning the individual records.
</p>
<p>
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return an <code>object</code> or a <code>null</code> value.
The string representation of the return value is used as "partition". The <code>null</code> value is handled separately without conversion into string.
All Records with the same partition then will be batched to one FlowFile and routed to the <code>success</code> Relationship.
</p>
<p>
This Processor maintains a Counter with the name of "Records Processed". This represents the number of processed Records regardless of partitioning.
</p>
<h3>Variable Bindings</h3>
<p>
While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information
that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made
available to the script:
</p>
<table>
<tr>
<th>Variable Name</th>
<th>Description</th>
<th>Variable Class</th>
</tr>
<tr>
<td>record</td>
<td>The Record that is to be processed.</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-record/latest/org/apache/nifi/serialization/record/Record.html">Record</a></td>
</tr>
<tr>
<td>recordIndex</td>
<td>The zero-based index of the Record in the FlowFile.</td>
<td>Long (64-bit signed integer)</td>
</tr>
<tr>
<td>log</td>
<td>The Processor's Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any
log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN).</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/logging/ComponentLog.html">ComponentLog</a></td>
</tr>
<tr>
<td>attributes</td>
<td>Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable.
Any attempt to modify it will result in an UnsupportedOperationException being thrown.</td>
<td>java.util.Map</td>
</tr>
</table>
<h3>Return Value</h3>
<p>
The script is invoked separately for each Record. It is accepted to return with any Object values might be represented as string. This string value will be used as the partition of the given Record.
Alternately it is possible to return with <code>null</code> value.
Each time the script is invoked, it is expected to return a <code>boolean</code> value. Return values other than <code>boolean</code>, including <code>null</code> value will be handled as
unexpected script behaviour and handled accordingly: the processing will be interrupted and the incoming FlowFile will be transferred to the <code>failure</code> relationship without further execution.
</p>
<h2>Example</h2>
<p>
The following script
</p>
<p>
Example Input (CSV):
</p>
<pre>
<code>
starSystem, stellarType
Wolf 359, M
Epsilon Eridani, K
Tau Ceti, G
Groombridge 1618, K
Gliese 1, M
</code>
</pre>
<p>
Example Output 1 (CSV) - for partition "M":
</p>
<pre>
<code>
starSystem, stellarType
Wolf 359,M
Gliese 1,M
</code>
</pre>
<p>
Example Output 2 (CSV) - for partition "K":
</p>
<pre>
<code>
starSystem, stellarType
Epsilon Eridani,K
Groombridge 1618,K
</code>
</pre>
<p>
Example Output 3 (CSV) - for partition "G":
</p>
<pre>
<code>
starSystem, stellarType
Tau Ceti,G
</code>
</pre>
<p>
Note: the order of the outgoing FlowFiles is not guaranteed.
</p>
<p>
Example Script (Groovy):
</p>
<code>
return record.getValue("stellarType")
</code>
<p>
Example Script (Python):
</p>
<code>
_ = record.getValue("stellarType")
</code>
</body>
</html>

View File

@ -0,0 +1,264 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ScriptedValidateRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
h2 {margin-top: 4em}
h3 {margin-top: 3em}
td {text-align: left}
</style>
</head>
<body>
<h1>ScriptedValidateRecord</h1>
<h3>Description</h3>
<p>
The ScriptedValidateRecord Processor provides the ability to use a scripting language, such as Groovy or Jyton in order to validate Records in an incoming FlowFile.
NiFi provides several different Processors that can be used to work with Records in different ways. Each of these processors has its pros and cons.
The ScriptedValidateRecord is intended to work together with these processors and be used as a preliminary step in order to prevent more complex processing steps from working with incorrect Records.
</p>
<p>
The Processor expects an user defined script in order to determine the validity of the Records.
When creating a script, it is important to note that, unlike ExecuteScript, this Processor does not allow the script itself to expose Properties to be configured or define Relationships.
</p>
<p>
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return a <code>boolean</code> value, which is used to determine if the given Record is valid or not:
For Records the script returns with a <code>true</code> value, the given Record is considered valid and will be included to the outgoing FlowFile which will be routed to the <code>valid</code> Relationship.
For <code>false</code> values the given Record will be added to the FlowFile routed to the <code>invalid</code> Relationship.
Regardless of the number of incoming Records the outgoing Records will be batched. For one incoming FlowFile there could be no more than one FlowFile routed to the <code>valid</code> and the <code>invalid</code> Relationships.
In case of there are no valid or invalid Record presents there will be no transferred FlowFile for the respected Relationship.
In addition to this the incoming FlowFile will be transferred to the <code>original</code> Relationship without change.
If the script returns an object that is not considered as <code>boolean</code>, the incoming FlowFile will be routed to the <code>failure</code> Relationship instead and no FlowFile will be routed to the <code>valid</code> or <code>invalid</code> Relationships.
</p>
<p>
This Processor maintains a Counter: "Records Processed" indicating the number of Records were processed by the Processor.
</p>
<h3>Variable Bindings</h3>
<p>
While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information
that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made
available to the script:
</p>
<table>
<tr>
<th>Variable Name</th>
<th>Description</th>
<th>Variable Class</th>
</tr>
<tr>
<td>record</td>
<td>The Record that is to be processed.</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-record/latest/org/apache/nifi/serialization/record/Record.html">Record</a></td>
</tr>
<tr>
<td>recordIndex</td>
<td>The zero-based index of the Record in the FlowFile.</td>
<td>Long (64-bit signed integer)</td>
</tr>
<tr>
<td>log</td>
<td>The Processor's Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any
log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN).</td>
<td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/logging/ComponentLog.html">ComponentLog</a></td>
</tr>
<tr>
<td>attributes</td>
<td>Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable.
Any attempt to modify it will result in an UnsupportedOperationException being thrown.</td>
<td>java.util.Map</td>
</tr>
</table>
<h3>Return Value</h3>
<p>
Each time the script is invoked, it is expected to return a <code>boolean</code> value. Return values other than <code>boolean</code>, including <code>null</code> value will be handled as
unexpected script behaviour and handled accordingly: the processing will be interrupted and the incoming FlowFile will be transferred to the <code>failure</code> relationship without further execution.
</p>
<h2>Example Scripts</h2>
<h3>Validating based on position</h3>
<p>
The following script will consider only the first 2 Records as valid.
</p>
<p>
Example Input (CSV):
</p>
<pre>
<code>
company, numberOfTrains
Boston & Maine Railroad, 3
Chesapeake & Ohio Railroad, 2
Pennsylvania Railroad, 4
Reading Railroad, 2
</code>
</pre>
<p>
Example Output (CSV) - valid Relationship:
</p>
<pre>
<code>
company, numberOfTrains
Boston & Maine Railroad, 3
Chesapeake & Ohio Railroad, 2
</code>
</pre>
<p>
Example Output (CSV) - invalid Relationship:
</p>
<pre>
<code>
company, numberOfTrains
Pennsylvania Railroad, 4
Reading Railroad, 2
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
return recordIndex < 2 ? true : false
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
_ = True if (recordIndex < 2) else False
</code>
</pre>
<h3>Validating based on Record contents</h3>
<p>
The following script will filter the Records based on their content. Any Records satisfies the condition will be part of
the FlowFile routed to the <code>valid</code> Relationship, others wil lbe routed to the <code>invalid</code> Relationship.
</p>
<p>
Example Input (JSON):
</p>
<pre>
<code>
[
{
"company": "Boston & Maine Railroad",
"numberOfTrains": 3
}, {
"company": "Chesapeake & Ohio Railroad",
"numberOfTrains": -1
}, {
"company": "Pennsylvania Railroad",
"numberOfTrains": 2
}, {
"company": "Reading Railroad",
"numberOfTrains": 4
}
]
</code>
</pre>
<p>
Example Output (CSV) - valid Relationship:
</p>
<pre>
<code>
[
{
"company": "Boston & Maine Railroad",
"numberOfTrains": 3
}, {
"company": "Pennsylvania Railroad",
"numberOfTrains": 2
}, {
"company": "Reading Railroad",
"numberOfTrains": 4
}
]
</code>
</pre>
<p>
Example Output (CSV) - invalid Relationship:
</p>
<pre>
<code>
[
{
"company": "Chesapeake & Ohio Railroad",
"numberOfTrains": -1
}
]
</code>
</pre>
<p>
Example Script (Groovy):
</p>
<pre>
<code>
if (record.getValue("numberOfTrains").toInteger() >= 0) {
return true;
} else {
return false;
}
</code>
</pre>
<p>
Example Script (Python):
</p>
<pre>
<code>
trains = record.getValue("numberOfTrains")
_ = True if (trains >= 0) else False
</code>
</pre>
</body>
</html>

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert;
import org.junit.Test;
public class TestScriptedFilterRecord extends TestScriptedRouterProcessor {
private static final String SCRIPT = "return record.getValue(\"first\") == 1";
private static final Object[] MATCHING_RECORD_1 = new Object[] {1, "lorem"};
private static final Object[] MATCHING_RECORD_2 = new Object[] {1, "ipsum"};
private static final Object[] NON_MATCHING_RECORD_1 = new Object[] {2, "lorem"};
private static final Object[] NON_MATCHING_RECORD_2 = new Object[] {2, "ipsum"};
@Test
public void testIncomingFlowFileContainsMatchingRecordsOnly() {
// given
recordReader.addRecord(MATCHING_RECORD_1);
recordReader.addRecord(MATCHING_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenMatchingFlowFileContains(MATCHING_RECORD_1, MATCHING_RECORD_2);
}
@Test
public void testIncomingFlowFileContainsNonMatchingRecordsOnly() {
// given
recordReader.addRecord(NON_MATCHING_RECORD_1);
recordReader.addRecord(NON_MATCHING_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenMatchingFlowFileIsEmpty();
}
@Test
public void testIncomingFlowFileContainsMatchingAndNonMatchingRecords() {
// given
recordReader.addRecord(MATCHING_RECORD_1);
recordReader.addRecord(NON_MATCHING_RECORD_1);
recordReader.addRecord(MATCHING_RECORD_2);
recordReader.addRecord(NON_MATCHING_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenMatchingFlowFileContains(MATCHING_RECORD_1, MATCHING_RECORD_2);
}
@Test
public void testIncomingFlowFileContainsNoRecords() {
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenMatchingFlowFileIsEmpty();
}
@Test
public void testIncomingFlowFileCannotBeRead() {
// given
recordReader.failAfter(0);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToFailed();
thenMatchingFlowFileIsEmpty();
}
private void thenMatchingFlowFileContains(final Object[]... records) {
testRunner.assertTransferCount(ScriptedFilterRecord.RELATIONSHIP_SUCCESS, 1);
final MockFlowFile resultFlowFile = testRunner.getFlowFilesForRelationship(ScriptedFilterRecord.RELATIONSHIP_SUCCESS).get(0);
Assert.assertEquals(givenExpectedFlowFile(records), resultFlowFile.getContent());
Assert.assertEquals("text/plain", resultFlowFile.getAttribute("mime.type"));
}
private void thenMatchingFlowFileIsEmpty() {
testRunner.assertTransferCount(ScriptedFilterRecord.RELATIONSHIP_SUCCESS, 0);
}
@Override
protected Class<? extends Processor> givenProcessorType() {
return ScriptedFilterRecord.class;
}
@Override
protected String getScriptBody() {
return SCRIPT;
}
@Override
protected Relationship getOriginalRelationship() {
return ScriptedFilterRecord.RELATIONSHIP_ORIGINAL;
}
@Override
protected Relationship getFailedRelationship() {
return ScriptedFilterRecord.RELATIONSHIP_FAILURE;
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class TestScriptedPartitionRecord extends TestScriptedRouterProcessor {
private static final String PARTITION_ATTRIBUTE = "partition";
private static final Object[] PARTITION_1_RECORD_1 = new Object[] {1, "lorem"};
private static final Object[] PARTITION_1_RECORD_2 = new Object[] {1, "ipsum"};
private static final Object[] PARTITION_2_RECORD_1 = new Object[] {2, "lorem"};
private static final Object[] PARTITION_3_RECORD_1 = new Object[] {3, "lorem"};
private static final Object[] PARTITION_4_RECORD_1 = new Object[] {4, "lorem"};
private static final String PARTITION_1 = "partition1";
private static final String PARTITION_2 = "partition2";
private static final Integer PARTITION_3 = 3;
private static final String PARTITION_4 = null;
@Test
public void testIncomingFlowFileContainsNoRecords() {
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenNoPartitionExists();
}
@Test
public void testWhenSinglePartitionAndSingleRecord() {
// given
recordReader.addRecord(PARTITION_1_RECORD_1);
// when
whenTriggerProcessor();
thenIncomingFlowFileIsRoutedToOriginal();
// then
thenTheFollowingPartitionsExists(PARTITION_1);
thenPartitionContains(PARTITION_1, 0, 1, PARTITION_1_RECORD_1);
}
@Test
public void testWhenSinglePartitionAndMultipleRecords() {
// given
recordReader.addRecord(PARTITION_1_RECORD_1);
recordReader.addRecord(PARTITION_1_RECORD_2);
// when
whenTriggerProcessor();
thenIncomingFlowFileIsRoutedToOriginal();
// then
thenTheFollowingPartitionsExists(PARTITION_1);
thenPartitionContains(PARTITION_1, 0, 1, PARTITION_1_RECORD_1, PARTITION_1_RECORD_2);
}
@Test
public void testWhenMultiplePartitions() {
// given
recordReader.addRecord(PARTITION_1_RECORD_1);
recordReader.addRecord(PARTITION_1_RECORD_2);
recordReader.addRecord(PARTITION_2_RECORD_1);
// when
whenTriggerProcessor();
thenIncomingFlowFileIsRoutedToOriginal();
// then
thenTheFollowingPartitionsExists(PARTITION_1, PARTITION_2);
thenPartitionContains(PARTITION_2, 0, 2, PARTITION_2_RECORD_1);
thenPartitionContains(PARTITION_1, 1, 2, PARTITION_1_RECORD_1, PARTITION_1_RECORD_2);
}
@Test
public void testWhenPartitionIsNotString() {
// given
recordReader.addRecord(PARTITION_4_RECORD_1);
// when
whenTriggerProcessor();
thenIncomingFlowFileIsRoutedToOriginal();
// then
thenTheFollowingPartitionsExists(PARTITION_4);
thenPartitionContains(PARTITION_4, 0, 1, PARTITION_4_RECORD_1);
}
@Test
public void testWhenPartitionIsNull() {
// given
recordReader.addRecord(PARTITION_3_RECORD_1);
// when
whenTriggerProcessor();
thenIncomingFlowFileIsRoutedToOriginal();
// then
thenTheFollowingPartitionsExists(PARTITION_3.toString());
thenPartitionContains(PARTITION_3.toString(), 0, 1, PARTITION_3_RECORD_1);
}
private void thenNoPartitionExists() {
Assert.assertEquals(0, testRunner.getFlowFilesForRelationship(ScriptedPartitionRecord.RELATIONSHIP_SUCCESS).size());
}
private void thenTheFollowingPartitionsExists(final String... partitions) {
final List<MockFlowFile> outgoingFlowFiles = testRunner.getFlowFilesForRelationship(ScriptedPartitionRecord.RELATIONSHIP_SUCCESS);
Assert.assertEquals(partitions.length, outgoingFlowFiles.size());
final Set<String> outgoingPartitions = outgoingFlowFiles.stream().map(ff -> ff.getAttribute(PARTITION_ATTRIBUTE)).collect(Collectors.toSet());
for (final String partition : partitions) {
Assert.assertTrue(outgoingPartitions.contains(partition));
}
}
private void thenPartitionContains(final String partition, int index, int count, final Object[]... records) {
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ScriptedPartitionRecord.RELATIONSHIP_SUCCESS);
Set<MockFlowFile> outgoingFlowFiles = new HashSet<>();
for (final MockFlowFile flowFile : flowFiles) {
// If the partition is deliberately <code>null</code>, we also check if the attribute is added to the collection of attributes.
// This is in order to differentiate from the situation where the "partition" attribute is not added at all.
if (partition == null && flowFile.getAttributes().containsKey(PARTITION_ATTRIBUTE) && flowFile.getAttribute(PARTITION_ATTRIBUTE) == null) {
outgoingFlowFiles.add(flowFile);
} else if (flowFile.getAttribute(PARTITION_ATTRIBUTE).equals(partition)) {
outgoingFlowFiles.add(flowFile);
}
}
Assert.assertEquals(1, outgoingFlowFiles.size());
final MockFlowFile resultFlowFile = outgoingFlowFiles.iterator().next();
Assert.assertEquals(givenExpectedFlowFile(records), resultFlowFile.getContent());
Assert.assertEquals("text/plain", resultFlowFile.getAttribute("mime.type"));
Assert.assertEquals(String.valueOf(index), resultFlowFile.getAttribute("fragment.index"));
Assert.assertEquals(String.valueOf(count), resultFlowFile.getAttribute("fragment.count"));
}
@Override
protected Class<? extends Processor> givenProcessorType() {
return ScriptedPartitionRecord.class;
}
@Override
protected String getScriptFile() {
return "src/test/resources/groovy/test_scripted_partition_record.groovy";
}
@Override
protected Relationship getOriginalRelationship() {
return ScriptedPartitionRecord.RELATIONSHIP_ORIGINAL;
}
@Override
protected Relationship getFailedRelationship() {
return ScriptedPartitionRecord.RELATIONSHIP_FAILURE;
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import java.util.UUID;
abstract class TestScriptedRouterProcessor {
private static final String HEADER = "header§";
protected TestRunner testRunner;
protected MockRecordParser recordReader;
protected MockRecordWriter recordWriter;
protected String incomingFlowFileContent;
@Before
public void setUp() throws Exception {
testRunner = TestRunners.newTestRunner(givenProcessorType());
testRunner.setProperty(ScriptedTransformRecord.RECORD_READER, "record-reader");
testRunner.setProperty(ScriptedTransformRecord.RECORD_WRITER, "record-writer");
recordReader = new MockRecordParser();
recordReader.addSchemaField("first", RecordFieldType.INT);
recordReader.addSchemaField("second", RecordFieldType.STRING);
recordWriter = new MockRecordWriter(HEADER);
testRunner.addControllerService("record-reader", recordReader);
testRunner.addControllerService("record-writer", recordWriter);
if (getScriptBody() != null) {
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_FILE);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getScriptBody());
} else if (getScriptFile() != null) {
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, getScriptFile());
}
testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, "Groovy");
testRunner.enableControllerService(recordReader);
testRunner.enableControllerService(recordWriter);
incomingFlowFileContent = UUID.randomUUID().toString();
}
protected void whenTriggerProcessor() {
testRunner.enqueue(incomingFlowFileContent);
testRunner.run();
}
protected void thenIncomingFlowFileIsRoutedToOriginal() {
testRunner.assertTransferCount(getOriginalRelationship(), 1);
testRunner.assertTransferCount(getFailedRelationship(), 0);
Assert.assertEquals(incomingFlowFileContent, testRunner.getFlowFilesForRelationship(getOriginalRelationship()).get(0).getContent());
}
protected void thenIncomingFlowFileIsRoutedToFailed() {
testRunner.assertTransferCount(getOriginalRelationship(), 0);
testRunner.assertTransferCount(getFailedRelationship(), 1);
Assert.assertEquals(incomingFlowFileContent, testRunner.getFlowFilesForRelationship(getFailedRelationship()).get(0).getContent());
}
/**
* Generates the expected flow file content based on the records. Results the same format as the {@code MockRecordWriter} uses.
*/
protected String givenExpectedFlowFile(final Object[]... records) {
final StringBuilder expectedFlowFile = new StringBuilder(HEADER).append('\n');
for (final Object[] record : records) {
for (int i = 0; i < record.length; i++) {
expectedFlowFile.append('"').append(record[i].toString()).append('"');
if (i < record.length -1 ) {
expectedFlowFile.append(',');
}
}
expectedFlowFile.append('\n');
}
return expectedFlowFile.toString();
}
protected abstract Class<? extends Processor> givenProcessorType();
protected abstract Relationship getOriginalRelationship();
protected abstract Relationship getFailedRelationship();
protected String getScriptBody() {
return null;
}
protected String getScriptFile() {
return null;
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert;
import org.junit.Test;
public class TestScriptedValidateRecord extends TestScriptedRouterProcessor {
private static final String SCRIPT = "return record.getValue(\"first\") == 1";
private static final Object[] VALID_RECORD_1 = new Object[] {1, "lorem"};
private static final Object[] VALID_RECORD_2 = new Object[] {1, "ipsum"};
private static final Object[] INVALID_RECORD_1 = new Object[] {2, "lorem"};
private static final Object[] INVALID_RECORD_2 = new Object[] {2, "ipsum"};
@Test
public void testIncomingFlowFileContainsValidRecordsOnly() throws Exception {
// given
recordReader.addRecord(VALID_RECORD_1);
recordReader.addRecord(VALID_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenValidFlowFileContains(VALID_RECORD_1, VALID_RECORD_2);
thenNoInvalidFlowFile();
}
@Test
public void testIncomingFlowFileContainsInvalidRecordsOnly() {
// given
recordReader.addRecord(INVALID_RECORD_1);
recordReader.addRecord(INVALID_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenInvalidFlowFileContains(INVALID_RECORD_1, INVALID_RECORD_2);
thenNoValidFlowFile();
}
@Test
public void testIncomingFlowFileContainsBothValidAndInvalidRecords() {
// given
recordReader.addRecord(VALID_RECORD_1);
recordReader.addRecord(INVALID_RECORD_1);
recordReader.addRecord(VALID_RECORD_2);
recordReader.addRecord(INVALID_RECORD_2);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenValidFlowFileContains(VALID_RECORD_1, VALID_RECORD_2);
thenInvalidFlowFileContains(INVALID_RECORD_1, INVALID_RECORD_2);
}
@Test
public void testIncomingFlowFileContainsNoRecords() {
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToOriginal();
thenNoValidFlowFile();
thenNoInvalidFlowFile();
}
@Test
public void testIncomingFlowFileCannotBeRead() {
// given
recordReader.failAfter(0);
// when
whenTriggerProcessor();
// then
thenIncomingFlowFileIsRoutedToFailed();
thenNoValidFlowFile();
thenNoInvalidFlowFile();
}
private void thenValidFlowFileContains(final Object[]... records) {
testRunner.assertTransferCount(ScriptedValidateRecord.RELATIONSHIP_VALID, 1);
final MockFlowFile resultFlowFile = testRunner.getFlowFilesForRelationship(ScriptedValidateRecord.RELATIONSHIP_VALID).get(0);
Assert.assertEquals(givenExpectedFlowFile(records), resultFlowFile.getContent());
Assert.assertEquals("text/plain", resultFlowFile.getAttribute("mime.type"));
}
private void thenInvalidFlowFileContains(final Object[]... records) {
testRunner.assertTransferCount(ScriptedValidateRecord.RELATIONSHIP_INVALID, 1);
final MockFlowFile resultFlowFile = testRunner.getFlowFilesForRelationship(ScriptedValidateRecord.RELATIONSHIP_INVALID).get(0);
Assert.assertEquals(givenExpectedFlowFile(records), resultFlowFile.getContent());
Assert.assertEquals("text/plain", resultFlowFile.getAttribute("mime.type"));
}
private void thenNoValidFlowFile() {
testRunner.assertTransferCount(ScriptedValidateRecord.RELATIONSHIP_VALID, 0);
}
private void thenNoInvalidFlowFile() {
testRunner.assertTransferCount(ScriptedValidateRecord.RELATIONSHIP_INVALID, 0);
}
@Override
protected Class<? extends Processor> givenProcessorType() {
return ScriptedValidateRecord.class;
}
@Override
protected String getScriptBody() {
return SCRIPT;
}
@Override
protected Relationship getOriginalRelationship() {
return ScriptedValidateRecord.RELATIONSHIP_ORIGINAL;
}
@Override
protected Relationship getFailedRelationship() {
return ScriptedValidateRecord.RELATIONSHIP_FAILURE;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
if (record.getValue("first") == 1) {
return "partition1";
} else if (record.getValue("first") == 2) {
return "partition2";
} else if (record.getValue("first") == 3) {
return 3;
} else {
return null;
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License') you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
if (record.getValue("first") == 1) {
return "route1";
} else if (record.getValue("first") == 2) {
return "route2";
} else {
return "route3";
}