NIFI-7572: Force ScriptedTransformRecord to recompile the script when started

This closes #4458.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matthew Burgess 2020-08-06 10:54:41 -04:00 committed by Andy LoPresto
parent 5cc7c0485a
commit 6488db1376
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
2 changed files with 62 additions and 0 deletions

View File

@ -183,6 +183,8 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset()); scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
} }
} }
// Always compile when first run
compiledScriptRef.set(null);
} }

View File

@ -333,6 +333,66 @@ public class TestScriptedTransformRecord {
assertEquals("Unknown Author", outputRecords.get(1).getAsRecord("book", bookSchema).getValue("author")); assertEquals("Unknown Author", outputRecords.get(1).getAsRecord("book", bookSchema).getValue("author"));
} }
@Test
public void testRecompileJythonScript() throws InitializationException {
final RecordSchema schema = createSimpleNumberSchema();
setup(schema);
testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, "python");
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "_ = record");
final Map<String, Object> num1 = new HashMap<>();
num1.put("num", 1);
final Map<String, Object> num2 = new HashMap<>();
num2.put("num", 2);
final Map<String, Object> num3 = new HashMap<>();
num3.put("num", 3);
recordReader.addRecord(new MapRecord(schema, num1));
recordReader.addRecord(new MapRecord(schema, num2));
recordReader.addRecord(new MapRecord(schema, num3));
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
MockFlowFile out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
assertEquals(3, testRunner.getCounterValue("Records Transformed").intValue());
assertEquals(0, testRunner.getCounterValue("Records Dropped").intValue());
List<Record> recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, recordsWritten.get(i).getAsInt("num").intValue());
}
testRunner.clearTransferState();
// reset the writer
testRunner.removeControllerService(recordWriter);
recordWriter = new ArrayListRecordWriter(schema);
testRunner.addControllerService("record-writer", recordWriter);
testRunner.enableControllerService(recordWriter);
testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, "record.setValue(\"num\", 5)\n_ = record");
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ScriptedTransformRecord.REL_SUCCESS, 1);
out = testRunner.getFlowFilesForRelationship(ScriptedTransformRecord.REL_SUCCESS).get(0);
out.assertAttributeEquals("record.count", "3");
recordsWritten = recordWriter.getRecordsWritten();
assertEquals(3, recordsWritten.size());
for (int i = 0; i < 3; i++) {
assertEquals(5, recordsWritten.get(i).getAsInt("num").intValue());
}
}
private Record createBook(final String author, final String date, final RecordSchema bookSchema, final RecordSchema outerSchema) { private Record createBook(final String author, final String date, final RecordSchema bookSchema, final RecordSchema outerSchema) {
final Map<String, Object> firstBookValues = new HashMap<>(); final Map<String, Object> firstBookValues = new HashMap<>();
firstBookValues.put("author", author); firstBookValues.put("author", author);