NIFI-10169: When using the Insert Record Fields join strategy of JoinEnrichment, ensure that in order to combine schemas from the original record and the enrichment record we use incorporateSchema() so that even when the first enrichment record is null, we get the schema correct

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6157.
This commit is contained in:
Mark Payne 2022-06-25 11:52:38 -04:00 committed by Pierre Villard
parent 4a9c3435db
commit 39c366eeef
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 76 additions and 3 deletions

View File

@ -668,6 +668,7 @@
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
<exclude>src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro</exclude>
<exclude>src/test/resources/TestJoinEnrichment/insert-enrichment.json</exclude>
<exclude>src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json</exclude>
<exclude>src/test/resources/TestJoinEnrichment/insert-original.json</exclude>
<exclude>src/test/resources/TestJoinEnrichment/left-outer-join-enrichment.csv</exclude>
<exclude>src/test/resources/TestJoinEnrichment/left-outer-join-expected.csv</exclude>

View File

@ -39,6 +39,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy
@Override
protected Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final RecordSchema resultSchema) {
// We only need to incorporate the enrichment record's schema when determining the result schema. After that,
// we will use the result schema for writing, not the Record's schema. So we can ignore the expense of incorporating
// the fields.
return combineRecords(originalRecord, enrichmentRecord, false);
}
/**
* Creates a single record that combines both the original record and the enrichment record
* @param originalRecord the original record
* @param enrichmentRecord the enrichment record
* @param incorporateEnrichmentSchema whether or not to update the originalRecord's schema to include the fields of the enrichmentRecord. Doing so can be
* expensive and is not necessary if the Record's schema will not be used.
* @return the combined record
*/
private Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final boolean incorporateEnrichmentSchema) {
if (originalRecord == null) {
return null;
}
@ -57,17 +72,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy
}
final Record parentRecord = (Record) value;
enrichmentRecord.toMap().forEach(parentRecord::setValue);
if (incorporateEnrichmentSchema) {
parentRecord.incorporateSchema(enrichmentRecord.getSchema());
}
enrichmentRecord.toMap().forEach(parentRecord::setValue);
parentRecord.incorporateInactiveFields();
}
return originalRecord;
}
@Override
protected RecordSchema createResultSchema(final Record firstOriginalRecord, final Record firstEnrichmentRecord) {
final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, firstOriginalRecord.getSchema());
final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, true);
combined.incorporateInactiveFields();
return combined.getSchema();
}

View File

@ -49,6 +49,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestJoinEnrichment {
private static final File EXAMPLES_DIR = new File("src/test/resources/TestJoinEnrichment");
@ -149,7 +150,7 @@ public class TestJoinEnrichment {
// Tests that the Insert Enrichment Record Fields example in the Additional Details produces expected output
@Test
public void testInsertEnrichmentFields() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
public void testInsertEnrichmentFields() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
final ArrayListRecordWriter writer = setupJsonServices(runner);
@ -191,6 +192,48 @@ public class TestJoinEnrichment {
assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email"));
}
// Tests that when the first enrichment record has a null value, that we still properly apply subsequent enrichments.
@Test
public void testFirstEnrichmentRecordNull() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
final ArrayListRecordWriter writer = setupJsonServices(runner);
runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_INSERT_ENRICHMENT_FIELDS);
runner.setProperty(JoinEnrichment.INSERTION_RECORD_PATH, "/purchase/customer");
final Map<String, String> originalAttributes = new HashMap<>();
originalAttributes.put("enrichment.group.id", "abc");
originalAttributes.put("enrichment.role", "ORIGINAL");
runner.enqueue(new File(EXAMPLES_DIR, "insert-original.json").toPath(), originalAttributes);
final Map<String, String> enrichmentAttributes = new HashMap<>();
enrichmentAttributes.put("enrichment.group.id", "abc");
enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
runner.enqueue(new File(EXAMPLES_DIR, "insert-enrichment-first-value-null.json").toPath(), enrichmentAttributes);
runner.run();
runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1);
runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2);
final List<Record> written = writer.getRecordsWritten();
assertEquals(2, written.size());
final RecordPath recordPath = RecordPath.compile("/purchase/customer/customerDetails");
final List<Object> firstCustomerDetailsList = recordPath.evaluate(written.get(0)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
assertEquals(1, firstCustomerDetailsList.size());
final Record customerDetails = (Record) firstCustomerDetailsList.get(0);
assertNull(customerDetails);
final List<Object> secondCustomerDetailsList = recordPath.evaluate(written.get(1)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
assertEquals(1, secondCustomerDetailsList.size());
final Record secondCustomerDetails = (Record) secondCustomerDetailsList.get(0);
assertEquals(5512, secondCustomerDetails.getValue("id"));
assertEquals("555-555-5511", secondCustomerDetails.getValue("phone"));
assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email"));
}
private List<Record> readCsvRecords(final File file) throws IOException, SchemaNotFoundException, MalformedRecordException {
final CommaSeparatedRecordReader reader = new CommaSeparatedRecordReader();

View File

@ -0,0 +1,10 @@
[
{
"customerDetails": null
}, {
"customerDetails": {
"id": 5512,
"phone": "555-555-5511",
"email": "jane.doe@nifi.apache.org"
}
}]