NIFI-13714 Fixed RecordTransform Python Processor Partition Handling (#9253)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Gabor Gyimesi 2024-09-21 15:18:42 +02:00 committed by GitHub
parent 729247976d
commit 426b8feaa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 92 additions and 1 deletions

View File

@ -87,8 +87,14 @@ class __RecordTransformResult__:
return self.processor_result.relationship return self.processor_result.relationship
def getPartition(self): def getPartition(self):
return self.processor_result.partition if self.processor_result.partition is None:
return None
map = JvmHolder.jvm.java.util.HashMap()
for key, value in self.processor_result.partition.items():
map.put(key, value)
return map
class RecordTransformResult: class RecordTransformResult:

View File

@ -171,6 +171,91 @@ public class PythonProcessorIT extends NiFiSystemIT {
assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") )); assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") ));
} }
@Test
public void testRecordTransformPartitioning() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity setRecordField = getClientUtil().createPythonProcessor("SetRecordField");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
// Add Reader and Writer
final ControllerServiceEntity csvReader = getClientUtil().createControllerService("MockCSVReader");
final ControllerServiceEntity csvWriter = getClientUtil().createControllerService("MockCSVWriter");
getClientUtil().enableControllerService(csvReader);
getClientUtil().enableControllerService(csvWriter);
// Configure the SetRecordField property
final Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("Record Reader", csvReader.getId());
fieldMap.put("Record Writer", csvWriter.getId());
getClientUtil().updateProcessorProperties(setRecordField, fieldMap);
getClientUtil().setAutoTerminatedRelationships(setRecordField, new HashSet<>(Arrays.asList("original", "failure")));
// Set contents of GenerateFlowFile
getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("Text", "name, group\nJane Doe, default\nJake Doe, other"));
// Connect flow
getClientUtil().createConnection(generate, setRecordField, "success");
final ConnectionEntity outputConnection = getClientUtil().createConnection(setRecordField, terminate, "success");
// Wait for processor validation to complete
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().waitForValidProcessor(setRecordField.getId());
// Run the flow
getClientUtil().startProcessor(generate);
getClientUtil().startProcessor(setRecordField);
// Wait for output data
waitForQueueCount(outputConnection.getId(), 2);
// Verify output contents. We don't know the order that the fields will be in, but we know that we should get back 2 fields per record: name, group.
final String ff1Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0);
final String[] ff1Lines = ff1Contents.split("\n");
final String ff1HeaderLine = ff1Lines[0];
final List<String> ff1Headers = Stream.of(ff1HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff1Headers.contains("name"));
assertTrue(ff1Headers.contains("group"));
final Map<String, Integer> ff1HeaderIndices = new HashMap<>();
int index = 0;
for (final String header : ff1Headers) {
ff1HeaderIndices.put(header, index++);
}
final String firstRecordLine = ff1Lines[1];
final List<String> firstRecordValues = Stream.of(firstRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jane Doe", firstRecordValues.get( ff1HeaderIndices.get("name") ));
assertEquals("default", firstRecordValues.get( ff1HeaderIndices.get("group") ));
final String ff2Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 1);
final String[] ff2Lines = ff2Contents.split("\n");
final String ff2HeaderLine = ff2Lines[0];
final List<String> ff2Headers = Stream.of(ff2HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff2Headers.contains("name"));
assertTrue(ff2Headers.contains("group"));
final Map<String, Integer> headerIndices = new HashMap<>();
index = 0;
for (final String header : ff2Headers) {
headerIndices.put(header, index++);
}
final String secondRecordLine = ff2Lines[1];
final List<String> secondRecordValues = Stream.of(secondRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") ));
assertEquals("other", secondRecordValues.get( headerIndices.get("group") ));
}
@Test @Test
public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException { public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException {
final String messageContents = "Hello World"; final String messageContents = "Hello World";