NIFI-11523 Refining schema handling for ScriptedTransfromRecord

This closes #7226

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Bence Simon 2023-05-03 14:12:59 +02:00 committed by Mike Thomsen
parent 7570f1c783
commit 01e72d6b51
2 changed files with 23 additions and 12 deletions

View File

@ -137,6 +137,7 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordCounts counts = new RecordCounts();
try {
@ -165,10 +166,8 @@ public class ScriptedTransformRecord extends ScriptedRecordProcessor {
record.incorporateInactiveFields();
if (writer == null) {
final RecordSchema writerSchema;
writerSchema = record.getSchema();
try {
final RecordSchema writerSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
writer = writerFactory.createWriter(getLogger(), writerSchema, out, flowFile);
} catch (SchemaNotFoundException e) {
throw new IOException(e);

View File

@ -33,6 +33,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -96,16 +97,17 @@ public class TestScriptedTransformRecord {
}
@Test
public void testAddFieldToSchema() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
public void testAddFieldToSchemaWhenWriterSchemaIsDefined() throws InitializationException {
final RecordSchema readSchema = createSimpleNumberSchema();
final RecordSchema writeSchema = createSchemaWithAddedValue();
setupWithDifferentSchemas(readSchema, writeSchema);
testRunner.removeProperty(ScriptingComponentUtils.SCRIPT_BODY);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/groovy/AddNewField.groovy");
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 1))));
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 2))));
recordReader.addRecord(new MapRecord(schema, new HashMap<>(Collections.singletonMap("num", 3))));
recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 1))));
recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 2))));
recordReader.addRecord(new MapRecord(readSchema, new HashMap<>(Collections.singletonMap("num", 3))));
testRunner.enqueue(new byte[0]);
@ -127,7 +129,6 @@ public class TestScriptedTransformRecord {
written.forEach(record -> assertEquals(88, record.getAsInt("added-value").intValue()));
}
@Test
public void testZeroRecordInput() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
@ -484,12 +485,16 @@ public class TestScriptedTransformRecord {
private void setup(final RecordSchema schema) throws InitializationException {
setupWithDifferentSchemas(schema, schema);
}
private void setupWithDifferentSchemas(final RecordSchema readerSchema, final RecordSchema writerSchema) throws InitializationException {
testRunner = TestRunners.newTestRunner(ScriptedTransformRecord.class);
testRunner.setProperty(ScriptedTransformRecord.RECORD_READER, "record-reader");
testRunner.setProperty(ScriptedTransformRecord.RECORD_WRITER, "record-writer");
recordReader = new ArrayListRecordReader(schema);
recordWriter = new ArrayListRecordWriter(schema);
recordReader = new ArrayListRecordReader(readerSchema);
recordWriter = new ArrayListRecordWriter(writerSchema);
testRunner.addControllerService("record-reader", recordReader);
testRunner.addControllerService("record-writer", recordWriter);
@ -507,4 +512,11 @@ public class TestScriptedTransformRecord {
return schema;
}
private RecordSchema createSchemaWithAddedValue() {
final RecordField recordFieldNum = new RecordField("num", RecordFieldType.INT.getDataType());
final RecordField recordFieldAdd = new RecordField("added-value", RecordFieldType.INT.getDataType());
final List<RecordField> recordFields = Arrays.asList(recordFieldNum, recordFieldAdd);
final RecordSchema schema = new SimpleRecordSchema(recordFields);
return schema;
}
}