diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java index df44149c21..ff8d6699a3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -372,7 +373,7 @@ public class JoinEnrichment extends BinFiles { final WriteResult writeResult; final String mimeType; FlowFile output; - try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, session, writerSchema)) { + try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, combinedAttributes, session, writerSchema)) { // Create output FlowFile output = session.create(flowFiles); @@ -426,10 +427,10 @@ public class JoinEnrichment extends BinFiles { private RecordJoinStrategy getJoinStrategy(final ProcessContext context, final Map attributes) { final String strategyName = context.getProperty(JOIN_STRATEGY).getValue(); if (strategyName.equalsIgnoreCase(JOIN_SQL.getValue())) { - final String sql = context.getProperty(SQL).getValue(); + final PropertyValue sqlPropertyValue = context.getProperty(SQL); final int defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(attributes).asInteger(); final int defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(attributes).asInteger(); - return new SqlJoinStrategy(sqlJoinCache, sql, getLogger(), defaultPrecision, defaultScale); + return new SqlJoinStrategy(sqlJoinCache, sqlPropertyValue, getLogger(), defaultPrecision, defaultScale); } else if (strategyName.equalsIgnoreCase(JOIN_WRAPPER.getValue())) { return new WrapperJoinStrategy(getLogger()); } else if (strategyName.equalsIgnoreCase(JOIN_INSERT_ENRICHMENT_FIELDS.getValue())) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java index 78c4b91ec7..30e7bfa739 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java @@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.RecordSet; import java.io.IOException; import java.io.InputStream; +import java.util.Map; public abstract class IndexCorrelatedJoinStrategy implements RecordJoinStrategy { private final ComponentLog logger; @@ -41,7 +42,9 @@ public abstract class IndexCorrelatedJoinStrategy implements RecordJoinStrategy } @Override - public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema writerSchema) throws Exception { + public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map combinedAttributes, + final ProcessSession session,final RecordSchema writerSchema) throws Exception { + final FlowFile originalFlowFile = originalInput.getFlowFile(); final FlowFile enrichmentFlowFile = enrichmentInput.getFlowFile(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/RecordJoinStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/RecordJoinStrategy.java index 50b90a4557..8ae295c676 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/RecordJoinStrategy.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/RecordJoinStrategy.java @@ -20,6 +20,8 @@ package org.apache.nifi.processors.standard.enrichment; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.serialization.record.RecordSchema; +import java.util.Map; + public interface RecordJoinStrategy { - RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, ProcessSession session, RecordSchema outputSchema) throws Exception; + RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, Map combinedAttributes, ProcessSession session, RecordSchema outputSchema) throws Exception; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/SqlJoinStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/SqlJoinStrategy.java index 00dbe1f2d1..f34008cae9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/SqlJoinStrategy.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/SqlJoinStrategy.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard.enrichment; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.queryrecord.RecordDataSource; @@ -28,6 +29,7 @@ import org.apache.nifi.sql.NiFiTable; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Map; public class SqlJoinStrategy implements RecordJoinStrategy { public static final String ENRICHMENT_TABLE_NAME = "ENRICHMENT"; @@ -35,20 +37,23 @@ public class SqlJoinStrategy implements RecordJoinStrategy { private final SqlJoinCache cache; private final ComponentLog logger; - private final String sql; + private final PropertyValue sqlPropertyValue; private final int defaultPrecision; private final int defaultScale; - public SqlJoinStrategy(final SqlJoinCache cache, final String sql, final ComponentLog logger, final int defaultPrecision, final int defaultScale) { + public SqlJoinStrategy(final SqlJoinCache cache, final PropertyValue sqlPropertyValue, final ComponentLog logger, final int defaultPrecision, final int defaultScale) { this.cache = cache; - this.sql = sql; + this.sqlPropertyValue = sqlPropertyValue; this.logger = logger; this.defaultPrecision = defaultPrecision; this.defaultScale = defaultScale; } @Override - public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema outputSchema) throws SQLException { + public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map combinedAttributes, + final ProcessSession session, final RecordSchema outputSchema) throws SQLException { + + final String sql = sqlPropertyValue.evaluateAttributeExpressions(combinedAttributes).getValue(); final SqlJoinCalciteParameters calciteParameters = cache.getCalciteParameters(sql, outputSchema, originalInput, enrichmentInput); final NiFiTable originalTable = calciteParameters.getDatabase().getTable(ORIGINAL_TABLE_NAME); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java index 000c2051f8..a2778f72bf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java @@ -154,6 +154,43 @@ public class TestJoinEnrichment { assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType()); } + @Test + public void testELReferencingAttribute() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment()); + + final ArrayListRecordWriter writer = setupCsvServices(runner); + runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_SQL); + runner.setProperty(JoinEnrichment.SQL, "SELECT original.id, enrichment.${desired_enrichment_column} FROM original JOIN enrichment ON original.id = enrichment.id"); + + final Map originalAttributes = new HashMap<>(); + originalAttributes.put("enrichment.group.id", "abc"); + originalAttributes.put("enrichment.role", "ORIGINAL"); + runner.enqueue("id\n5", originalAttributes); + + final Map enrichmentAttributes = new HashMap<>(); + enrichmentAttributes.put("enrichment.group.id", "abc"); + enrichmentAttributes.put("enrichment.role", "ENRICHMENT"); + enrichmentAttributes.put("desired_enrichment_column", "name"); + runner.enqueue("id,name\n5,John Doe", enrichmentAttributes); + + runner.run(); + + runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1); + runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2); + + final List written = writer.getRecordsWritten(); + assertEquals(1, written.size()); + + final Record outRecord = written.get(0); + assertEquals(5, outRecord.getAsInt("id")); + assertEquals("John Doe", outRecord.getValue("name")); + + final RecordSchema schema = outRecord.getSchema(); + assertEquals(RecordFieldType.STRING, schema.getField("id").get().getDataType().getFieldType()); + assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType()); + } + + // Tests that the LEFT OUTER JOIN example in the Additional Details works as expected @Test public void testLeftOuterJoin() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {