mirror of https://github.com/apache/nifi.git
NIFI-7729: Ensure that if a script used in the ScriptedTransformRecord processor introduces a new field to the schema that the field gets incorporated into the schema.
NIFI-7729: Updated docs to explain how to add new fields to Records via ScriptedTransformRecord and added example. Fixed checkstyle violation. NIFI-7729: Always call Record.incorporateInactiveFields with ScriptedTransformRecord Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #4470
This commit is contained in:
parent
a79493082c
commit
869c4236c4
|
@ -330,7 +330,17 @@ public class MapRecord implements Record {
|
|||
|
||||
@Override
|
||||
public void setValue(final String fieldName, final Object value) {
|
||||
setValueAndGetField(fieldName, value);
|
||||
final Optional<RecordField> existingField = setValueAndGetField(fieldName, value);
|
||||
|
||||
if (!existingField.isPresent()) {
|
||||
if (inactiveFields == null) {
|
||||
inactiveFields = new LinkedHashSet<>();
|
||||
}
|
||||
|
||||
final DataType inferredDataType = DataTypeUtils.inferDataType(value, RecordFieldType.STRING.getDataType());
|
||||
final RecordField field = new RecordField(fieldName, inferredDataType);
|
||||
inactiveFields.add(field);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<RecordField> setValueAndGetField(final String fieldName, final Object value) {
|
||||
|
|
|
@ -36,11 +36,15 @@ import java.util.Map;
|
|||
public class ArrayListRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
|
||||
private final List<Record> records = new ArrayList<>();
|
||||
private final RecordSchema schema;
|
||||
private volatile RecordSchema declaredSchema;
|
||||
|
||||
public ArrayListRecordWriter(final RecordSchema schema) {
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
public RecordSchema getDeclaredSchema() {
|
||||
return declaredSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema(final Map<String, String> variables, final RecordSchema readSchema) {
|
||||
|
@ -49,6 +53,7 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
|
|||
|
||||
@Override
|
||||
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
|
||||
declaredSchema = schema;
|
||||
return new ArrayListRecordSetWriter(records, out);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
|
|||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import javax.script.Bindings;
|
||||
import javax.script.Compilable;
|
||||
|
@ -73,7 +74,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@EventDriven
|
||||
|
@ -95,7 +95,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
@SeeAlso(classNames = {"org.apache.nifi.processors.script.ExecuteScript",
|
||||
"org.apache.nifi.processors.standard.UpdateRecord",
|
||||
"org.apache.nifi.processors.standard.QueryRecord",
|
||||
"org.apache.nifi.processors.standard.JoltTransformRecord",
|
||||
"org.apache.nifi.processors.jolt.record.JoltTransformRecord",
|
||||
"org.apache.nifi.processors.standard.LookupRecord"})
|
||||
public class ScriptedTransformRecord extends AbstractProcessor implements Searchable {
|
||||
private static final String PYTHON_SCRIPT_LANGUAGE = "python";
|
||||
|
@ -224,69 +224,79 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
|
|||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
|
||||
final AtomicLong recordIndex = new AtomicLong(0L);
|
||||
final AtomicLong dropCount = new AtomicLong(0L);
|
||||
|
||||
final Counts counts = new Counts();
|
||||
try {
|
||||
final Map<String, String> attributesToAdd = new HashMap<>();
|
||||
|
||||
// Read each record, transform it, and write out the transformed version
|
||||
session.write(flowFile, (in, out) -> {
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
|
||||
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), reader.getSchema(), out, flowFile)) {
|
||||
final AtomicReference<RecordSetWriter> writerReference = new AtomicReference<>();
|
||||
|
||||
writer.beginRecordSet();
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
|
||||
// We want to lazily create the Record Writer from the Record Writer Factory.
|
||||
// We do this because if the script adds new fields to the Records, we want to ensure that the Record Writer is
|
||||
// created with the appropriate schema that has those fields accounted for. We can only do this if we first transform the Record.
|
||||
// By lazily creating the Writer this way, we can incorporate any newly added fields to the first Record and then write it out.
|
||||
// Note that this means that any newly added field must be added to the first Record, even if the value is null. Otherwise, the field
|
||||
// will not make its way to the Record Writer's schema.
|
||||
final RecordWriteAction writeAction = new RecordWriteAction() {
|
||||
private RecordSetWriter writer = null;
|
||||
|
||||
Record inputRecord;
|
||||
while ((inputRecord = reader.nextRecord()) != null) {
|
||||
final long index = recordIndex.get();
|
||||
|
||||
// Evaluate the script against the Record
|
||||
final Object returnValue = evaluator.evaluate(inputRecord, index);
|
||||
|
||||
recordIndex.getAndIncrement();
|
||||
|
||||
// If a null value was returned, drop the Record
|
||||
if (returnValue == null) {
|
||||
getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", new Object[]{index, inputRecord, flowFile});
|
||||
dropCount.getAndIncrement();
|
||||
continue;
|
||||
}
|
||||
|
||||
// If a single Record was returned, write it out
|
||||
if (returnValue instanceof Record) {
|
||||
final Record transformedRecord = (Record) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[] {index, inputRecord, transformedRecord, flowFile});
|
||||
writer.write(transformedRecord);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If a Collection was returned, ensure that every element in the collection is a Record and write them out
|
||||
if (returnValue instanceof Collection) {
|
||||
final Collection<?> collection = (Collection<?>) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[] {index, inputRecord, collection, flowFile});
|
||||
|
||||
for (final Object element : collection) {
|
||||
if (!(element instanceof Record)) {
|
||||
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
|
||||
+ " but instead of returning a Record or Collection of Records, script returned a Collection of values, "
|
||||
+ "at least one of which was not a Record but instead was: " + returnValue);
|
||||
}
|
||||
|
||||
writer.write((Record) element);
|
||||
@Override
|
||||
public void write(final Record record) throws IOException {
|
||||
if (record == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
continue;
|
||||
record.incorporateInactiveFields();
|
||||
|
||||
if (writer == null) {
|
||||
final RecordSchema writerSchema;
|
||||
writerSchema = record.getSchema();
|
||||
|
||||
try {
|
||||
writer = writerFactory.createWriter(getLogger(), writerSchema, out, flowFile);
|
||||
} catch (SchemaNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
writerReference.set(writer);
|
||||
writer.beginRecordSet();
|
||||
}
|
||||
|
||||
writer.write(record);
|
||||
}
|
||||
};
|
||||
|
||||
final WriteResult writeResult;
|
||||
try {
|
||||
// Transform each Record.
|
||||
Record inputRecord;
|
||||
while ((inputRecord = reader.nextRecord()) != null) {
|
||||
processRecord(inputRecord, flowFile, counts, writeAction, evaluator);
|
||||
}
|
||||
|
||||
// Ensure that the value returned from the script is either null or a Record
|
||||
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
|
||||
+ " but instead of returning a Record, script returned a value of: " + returnValue);
|
||||
// If there were no records written, we still want to create a Record Writer. We do this for two reasons.
|
||||
// Firstly, the beginning/ending of the Record Set may result in output being written.
|
||||
// Secondly, we obtain important attributes from the WriteResult.
|
||||
RecordSetWriter writer = writerReference.get();
|
||||
if (writer == null) {
|
||||
writer = writerFactory.createWriter(getLogger(), reader.getSchema(), out, flowFile);
|
||||
writer.beginRecordSet();
|
||||
writeResult = writer.finishRecordSet();
|
||||
attributesToAdd.put("mime.type", writer.getMimeType());
|
||||
} else {
|
||||
writeResult = writer.finishRecordSet();
|
||||
attributesToAdd.put("mime.type", writer.getMimeType());
|
||||
}
|
||||
} finally {
|
||||
final RecordSetWriter writer = writerReference.get();
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Add WriteResults to the attributes to be added to the FlowFile
|
||||
final WriteResult writeResult = writer.finishRecordSet();
|
||||
attributesToAdd.put("mime.type", writer.getMimeType());
|
||||
attributesToAdd.putAll(writeResult.getAttributes());
|
||||
attributesToAdd.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||
} catch (final MalformedRecordException | SchemaNotFoundException | ScriptException e) {
|
||||
|
@ -298,22 +308,69 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
|
|||
session.putAllAttributes(flowFile, attributesToAdd);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final long transformCount = recordIndex.get() - dropCount.get();
|
||||
getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", new Object[] {transformCount, dropCount.get(), flowFile});
|
||||
final long transformCount = counts.getRecordCount() - counts.getDroppedCount();
|
||||
getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", new Object[] {transformCount, counts.getDroppedCount(), flowFile});
|
||||
session.adjustCounter("Records Transformed", transformCount, true);
|
||||
session.adjustCounter("Records Dropped", dropCount.get(), true);
|
||||
session.adjustCounter("Records Dropped", counts.getDroppedCount(), true);
|
||||
|
||||
final long millis = System.currentTimeMillis() - startMillis;
|
||||
session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + dropCount.get() + " Records", millis);
|
||||
session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + counts.getDroppedCount() + " Records", millis);
|
||||
} catch (final ProcessException e) {
|
||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {recordIndex.get(), flowFile}, e.getCause());
|
||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {counts.getRecordCount(), flowFile}, e.getCause());
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {recordIndex.get(), flowFile}, e);
|
||||
getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[] {counts.getRecordCount(), flowFile}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
private void processRecord(final Record inputRecord, final FlowFile flowFile, final Counts counts, final RecordWriteAction recordWriteAction,
|
||||
final ScriptEvaluator evaluator) throws IOException, ScriptException {
|
||||
final long index = counts.getRecordCount();
|
||||
|
||||
// Evaluate the script against the Record
|
||||
final Object returnValue = evaluator.evaluate(inputRecord, index);
|
||||
|
||||
counts.incrementRecordCount();
|
||||
|
||||
// If a null value was returned, drop the Record
|
||||
if (returnValue == null) {
|
||||
getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", new Object[]{index, inputRecord, flowFile});
|
||||
counts.incrementDroppedCount();
|
||||
return;
|
||||
}
|
||||
|
||||
// If a single Record was returned, write it out
|
||||
if (returnValue instanceof Record) {
|
||||
final Record transformedRecord = (Record) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, transformedRecord, flowFile});
|
||||
recordWriteAction.write(transformedRecord);
|
||||
return;
|
||||
}
|
||||
|
||||
// If a Collection was returned, ensure that every element in the collection is a Record and write them out
|
||||
if (returnValue instanceof Collection) {
|
||||
final Collection<?> collection = (Collection<?>) returnValue;
|
||||
getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, collection, flowFile});
|
||||
|
||||
for (final Object element : collection) {
|
||||
if (!(element instanceof Record)) {
|
||||
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
|
||||
+ " but instead of returning a Record or Collection of Records, script returned a Collection of values, "
|
||||
+ "at least one of which was not a Record but instead was: " + returnValue);
|
||||
}
|
||||
|
||||
recordWriteAction.write((Record) element);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Ensure that the value returned from the script is either null or a Record
|
||||
throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile
|
||||
+ " but instead of returning a Record, script returned a value of: " + returnValue);
|
||||
}
|
||||
|
||||
private ScriptEvaluator createEvaluator(final ScriptEngine scriptEngine, final FlowFile flowFile) throws ScriptException {
|
||||
if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
|
||||
final CompiledScript compiledScript = getOrCompileScript((Compilable) scriptEngine, scriptToRun);
|
||||
|
@ -365,7 +422,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
|
|||
private final CompiledScript compiledScript;
|
||||
private final Bindings bindings;
|
||||
|
||||
public PythonScriptEvaluator(final ScriptEngine scriptEngine, final CompiledScript compiledScript, final FlowFile flowFile) throws ScriptException {
|
||||
public PythonScriptEvaluator(final ScriptEngine scriptEngine, final CompiledScript compiledScript, final FlowFile flowFile) {
|
||||
// By pre-compiling the script here, we get significant performance gains. A quick 5-minute benchmark
|
||||
// shows gains of about 100x better performance. But even with the compiled script, performance pales
|
||||
// in comparison with Groovy.
|
||||
|
@ -411,4 +468,29 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
|
|||
return scriptEngine.eval(scriptToRun, bindings);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Counts {
|
||||
private long recordCount;
|
||||
private long droppedCount;
|
||||
|
||||
public long getRecordCount() {
|
||||
return recordCount;
|
||||
}
|
||||
|
||||
public long getDroppedCount() {
|
||||
return droppedCount;
|
||||
}
|
||||
|
||||
public void incrementRecordCount() {
|
||||
recordCount++;
|
||||
}
|
||||
|
||||
public void incrementDroppedCount() {
|
||||
droppedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
private interface RecordWriteAction {
|
||||
void write(Record record) throws IOException;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,6 +128,20 @@ td {text-align: left}
|
|||
|
||||
|
||||
|
||||
<a name="AddingNewFields"></a>
|
||||
<h3>Adding a New Fields</h3>
|
||||
|
||||
<p>
|
||||
A very common usage of Record-oriented processors is to allow the Record Reader to infer its schema and have the Record Writer inherit the Record's schema.
|
||||
In this scenario, it is important to note that the Record Writer will inherit the schema of the first Record that it encounters. Therefore, if the configured script
|
||||
will add a new field to a Record, it is important to ensure that the field is added to all Records (with a <code>null</code> value where appropriate).
|
||||
</p>
|
||||
|
||||
<p>
|
||||
See the <a href="#AddingFieldExample">Adding New Fields</a> example for more details.
|
||||
</p>
|
||||
|
||||
|
||||
|
||||
<h3>Performance Considerations</h3>
|
||||
|
||||
|
@ -370,6 +384,73 @@ _ = record
|
|||
|
||||
|
||||
|
||||
|
||||
<a name="AddingFieldExample"></a>
|
||||
<h3>Adding New Fields</h3>
|
||||
|
||||
<p>
|
||||
The following script adds a new field named "favoriteColor" to all Records. Additionally, it adds an "isOdd" field to all even-numbered Records.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
It is important that all Records have the same schema. Since we want to add an "isOdd" field to Records 1 and 3, the schema for Records 0 and 2 must also account
|
||||
for this. As a result, we will add the field to all Records but use a null value for Records that are not even. See <a href="#AddingNewFields">Adding New Fields</a> for more information.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Example Input Content (CSV):
|
||||
</p>
|
||||
<pre>
|
||||
<code>
|
||||
name, favoriteFood
|
||||
John Doe, Spaghetti
|
||||
Jane Doe, Pizza
|
||||
Jake Doe, Sushi
|
||||
June Doe, Hamburger
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Example Output (CSV):
|
||||
</p>
|
||||
<pre>
|
||||
<code>
|
||||
name, favoriteFood, favoriteColor, isOdd
|
||||
John Doe, Spaghetti, Blue,
|
||||
Jane Doe, Pizza, Blue, true
|
||||
Jake Doe, Sushi, Blue,
|
||||
June Doe, Hamburger, Blue, true
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<p>
|
||||
Example Script (Groovy):
|
||||
</p>
|
||||
<pre>
|
||||
<code>
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
|
||||
// Always set favoriteColor to Blue.
|
||||
// Because we are calling #setValue with a String as the field name, the field type will be inferred.
|
||||
record.setValue("favoriteColor", "Blue")
|
||||
|
||||
// Set the 'isOdd' field to true if the record index is odd. Otherwise, set the 'isOdd' field to <code>null</code>.
|
||||
// Because the value may be <code>null</code> for the first Record (in fact, it always will be for this particular case),
|
||||
// we need to ensure that the Record Writer's schema be given the correct type for the field. As a result, we will not call
|
||||
// #setValue with a String as the field name but rather will pass a RecordField as the first argument, as the RecordField
|
||||
// allows us to specify the type of the field.
|
||||
// Also note that <code>RecordField</code> and <code>RecordFieldType</code> are <code>import</code>ed above.
|
||||
record.setValue(new RecordField("isOdd", RecordFieldType.BOOLEAN.getDataType()), recordIndex % 2 == 1 ? true : null)
|
||||
|
||||
return record
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<h3>Fork Record</h3>
|
||||
|
||||
<p>The following script return each Record that it encounters, plus another Record, which is derived from the first, but where the 'num' field is one less than the 'num' field of the input.</p>
|
||||
|
|
|
@ -37,10 +37,12 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestScriptedTransformRecord {
|
||||
|
||||
|
@ -93,6 +95,83 @@ public class TestScriptedTransformRecord {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddFieldToSchema() throws InitializationException {
|
||||
final RecordSchema schema = createSimpleNumberSchema();
|
||||
setup(schema);
|
||||
|
||||
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
|
||||
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/AddNewField.groovy");
|
||||
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 1))));
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 2))));
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 3))));
|
||||
|
||||
testRunner.enqueue(new byte[0]);
|
||||
|
||||
testRunner.run();
|
||||
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
|
||||
out.assertAttributeEquals("record.count", "3");
|
||||
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
|
||||
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
|
||||
|
||||
final RecordSchema declaredSchema = recordWriter.getDeclaredSchema();
|
||||
final Optional<RecordField> addedValueOptionalField = declaredSchema.getField("added-value");
|
||||
assertTrue(addedValueOptionalField.isPresent());
|
||||
final RecordField addedField = addedValueOptionalField.get();
|
||||
assertEquals(RecordFieldType.INT, addedField.getDataType().getFieldType());
|
||||
|
||||
final List<Record> written = recordWriter.getRecordsWritten();
|
||||
written.forEach(record -> assertEquals(88, record.getAsInt("added-value").intValue()));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testZeroRecordInput() throws InitializationException {
|
||||
final RecordSchema schema = createSimpleNumberSchema();
|
||||
setup(schema);
|
||||
|
||||
testRunner.enqueue(new byte[0]);
|
||||
|
||||
testRunner.run();
|
||||
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
|
||||
out.assertAttributeEquals("record.count", "0");
|
||||
assertEquals(0, testRunner.getCounterValue("Records Transformed").intValue());
|
||||
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
|
||||
|
||||
final List<Record> written = recordWriter.getRecordsWritten();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllRecordsFiltered() throws InitializationException {
|
||||
final RecordSchema schema = createSimpleNumberSchema();
|
||||
setup(schema);
|
||||
|
||||
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "return null");
|
||||
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 1))));
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 2))));
|
||||
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 3))));
|
||||
|
||||
testRunner.enqueue(new byte[0]);
|
||||
|
||||
testRunner.run();
|
||||
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
|
||||
out.assertAttributeEquals("record.count", "0");
|
||||
assertEquals(0, testRunner.getCounterValue("Records Transformed").intValue());
|
||||
assertEquals(3, testRunner.getCounterValue("Records Dropped").intValue());
|
||||
|
||||
final List<Record> written = recordWriter.getRecordsWritten();
|
||||
assertTrue(written.isEmpty());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testCollectionOfRecords() throws InitializationException {
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the 'License') you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
record.setValue("added-value", 88)
|
||||
return record
|
Loading…
Reference in New Issue