diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 07acc867d0..7d5c95c037 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -668,6 +668,7 @@ src/test/resources/TestValidateRecord/int-maps-data.json src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro src/test/resources/TestJoinEnrichment/insert-enrichment.json + src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json src/test/resources/TestJoinEnrichment/insert-original.json src/test/resources/TestJoinEnrichment/left-outer-join-enrichment.csv src/test/resources/TestJoinEnrichment/left-outer-join-expected.csv diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java index 7f09bf928e..8051c492af 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java @@ -39,6 +39,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy @Override protected Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final RecordSchema resultSchema) { + // We only need to incorporate the enrichment record's schema when determining the result schema. After that, + // we will use the result schema for writing, not the Record's schema. So we can ignore the expense of incorporating + // the fields. + return combineRecords(originalRecord, enrichmentRecord, false); + } + + /** + * Creates a single record that combines both the original record and the enrichment record + * @param originalRecord the original record + * @param enrichmentRecord the enrichment record + * @param incorporateEnrichmentSchema whether or not to update the originalRecord's schema to include the fields of the enrichmentRecord. Doing so can be + * expensive and is not necessary if the Record's schema will not be used. + * @return the combined record + */ + private Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final boolean incorporateEnrichmentSchema) { if (originalRecord == null) { return null; } @@ -57,17 +72,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy } final Record parentRecord = (Record) value; - enrichmentRecord.toMap().forEach(parentRecord::setValue); + if (incorporateEnrichmentSchema) { + parentRecord.incorporateSchema(enrichmentRecord.getSchema()); + } + enrichmentRecord.toMap().forEach(parentRecord::setValue); parentRecord.incorporateInactiveFields(); } return originalRecord; } + @Override protected RecordSchema createResultSchema(final Record firstOriginalRecord, final Record firstEnrichmentRecord) { - final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, firstOriginalRecord.getSchema()); + final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, true); combined.incorporateInactiveFields(); return combined.getSchema(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java index 1a30332dce..70568ec733 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; public class TestJoinEnrichment { private static final File EXAMPLES_DIR = new File("src/test/resources/TestJoinEnrichment"); @@ -149,7 +150,7 @@ public class TestJoinEnrichment { // Tests that the Insert Enrichment Record Fields example in the Additional Details produces expected output @Test - public void testInsertEnrichmentFields() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException { + public void testInsertEnrichmentFields() throws InitializationException, IOException { final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment()); final ArrayListRecordWriter writer = setupJsonServices(runner); @@ -191,6 +192,48 @@ public class TestJoinEnrichment { assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email")); } + // Tests that when the first enrichment record has a null value, that we still properly apply subsequent enrichments. + @Test + public void testFirstEnrichmentRecordNull() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment()); + + final ArrayListRecordWriter writer = setupJsonServices(runner); + runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_INSERT_ENRICHMENT_FIELDS); + runner.setProperty(JoinEnrichment.INSERTION_RECORD_PATH, "/purchase/customer"); + + final Map originalAttributes = new HashMap<>(); + originalAttributes.put("enrichment.group.id", "abc"); + originalAttributes.put("enrichment.role", "ORIGINAL"); + runner.enqueue(new File(EXAMPLES_DIR, "insert-original.json").toPath(), originalAttributes); + + final Map enrichmentAttributes = new HashMap<>(); + enrichmentAttributes.put("enrichment.group.id", "abc"); + enrichmentAttributes.put("enrichment.role", "ENRICHMENT"); + runner.enqueue(new File(EXAMPLES_DIR, "insert-enrichment-first-value-null.json").toPath(), enrichmentAttributes); + + runner.run(); + + runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1); + runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2); + + final List written = writer.getRecordsWritten(); + assertEquals(2, written.size()); + + final RecordPath recordPath = RecordPath.compile("/purchase/customer/customerDetails"); + + final List firstCustomerDetailsList = recordPath.evaluate(written.get(0)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); + assertEquals(1, firstCustomerDetailsList.size()); + final Record customerDetails = (Record) firstCustomerDetailsList.get(0); + assertNull(customerDetails); + + final List secondCustomerDetailsList = recordPath.evaluate(written.get(1)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); + assertEquals(1, secondCustomerDetailsList.size()); + final Record secondCustomerDetails = (Record) secondCustomerDetailsList.get(0); + assertEquals(5512, secondCustomerDetails.getValue("id")); + assertEquals("555-555-5511", secondCustomerDetails.getValue("phone")); + assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email")); + } + private List readCsvRecords(final File file) throws IOException, SchemaNotFoundException, MalformedRecordException { final CommaSeparatedRecordReader reader = new CommaSeparatedRecordReader(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json new file mode 100644 index 0000000000..9631f11d2c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json @@ -0,0 +1,10 @@ +[ + { + "customerDetails": null + }, { + "customerDetails": { + "id": 5512, + "phone": "555-555-5511", + "email": "jane.doe@nifi.apache.org" + } +}] \ No newline at end of file