NIFI-11671: Allow FlowFile attributes to be referenced in SQL for JoinEnrichment

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8059.
This commit is contained in:
Mark Payne 2023-11-21 13:16:27 -05:00 committed by Pierre Villard
parent e68c384c12
commit d0c7ebb196
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 57 additions and 9 deletions

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -372,7 +373,7 @@ public class JoinEnrichment extends BinFiles {
final WriteResult writeResult;
final String mimeType;
FlowFile output;
try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, session, writerSchema)) {
try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, combinedAttributes, session, writerSchema)) {
// Create output FlowFile
output = session.create(flowFiles);
@ -426,10 +427,10 @@ public class JoinEnrichment extends BinFiles {
private RecordJoinStrategy getJoinStrategy(final ProcessContext context, final Map<String, String> attributes) {
final String strategyName = context.getProperty(JOIN_STRATEGY).getValue();
if (strategyName.equalsIgnoreCase(JOIN_SQL.getValue())) {
final String sql = context.getProperty(SQL).getValue();
final PropertyValue sqlPropertyValue = context.getProperty(SQL);
final int defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(attributes).asInteger();
final int defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(attributes).asInteger();
return new SqlJoinStrategy(sqlJoinCache, sql, getLogger(), defaultPrecision, defaultScale);
return new SqlJoinStrategy(sqlJoinCache, sqlPropertyValue, getLogger(), defaultPrecision, defaultScale);
} else if (strategyName.equalsIgnoreCase(JOIN_WRAPPER.getValue())) {
return new WrapperJoinStrategy(getLogger());
} else if (strategyName.equalsIgnoreCase(JOIN_INSERT_ENRICHMENT_FIELDS.getValue())) {

View File

@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.RecordSet;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
public abstract class IndexCorrelatedJoinStrategy implements RecordJoinStrategy {
private final ComponentLog logger;
@ -41,7 +42,9 @@ public abstract class IndexCorrelatedJoinStrategy implements RecordJoinStrategy
}
@Override
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema writerSchema) throws Exception {
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map<String, String> combinedAttributes,
final ProcessSession session,final RecordSchema writerSchema) throws Exception {
final FlowFile originalFlowFile = originalInput.getFlowFile();
final FlowFile enrichmentFlowFile = enrichmentInput.getFlowFile();

View File

@ -20,6 +20,8 @@ package org.apache.nifi.processors.standard.enrichment;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Map;
public interface RecordJoinStrategy {
RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, ProcessSession session, RecordSchema outputSchema) throws Exception;
RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, Map<String, String> combinedAttributes, ProcessSession session, RecordSchema outputSchema) throws Exception;
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard.enrichment;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.queryrecord.RecordDataSource;
@ -28,6 +29,7 @@ import org.apache.nifi.sql.NiFiTable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
public class SqlJoinStrategy implements RecordJoinStrategy {
public static final String ENRICHMENT_TABLE_NAME = "ENRICHMENT";
@ -35,20 +37,23 @@ public class SqlJoinStrategy implements RecordJoinStrategy {
private final SqlJoinCache cache;
private final ComponentLog logger;
private final String sql;
private final PropertyValue sqlPropertyValue;
private final int defaultPrecision;
private final int defaultScale;
public SqlJoinStrategy(final SqlJoinCache cache, final String sql, final ComponentLog logger, final int defaultPrecision, final int defaultScale) {
public SqlJoinStrategy(final SqlJoinCache cache, final PropertyValue sqlPropertyValue, final ComponentLog logger, final int defaultPrecision, final int defaultScale) {
this.cache = cache;
this.sql = sql;
this.sqlPropertyValue = sqlPropertyValue;
this.logger = logger;
this.defaultPrecision = defaultPrecision;
this.defaultScale = defaultScale;
}
@Override
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema outputSchema) throws SQLException {
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map<String, String> combinedAttributes,
final ProcessSession session, final RecordSchema outputSchema) throws SQLException {
final String sql = sqlPropertyValue.evaluateAttributeExpressions(combinedAttributes).getValue();
final SqlJoinCalciteParameters calciteParameters = cache.getCalciteParameters(sql, outputSchema, originalInput, enrichmentInput);
final NiFiTable originalTable = calciteParameters.getDatabase().getTable(ORIGINAL_TABLE_NAME);

View File

@ -154,6 +154,43 @@ public class TestJoinEnrichment {
assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType());
}
@Test
public void testELReferencingAttribute() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
final ArrayListRecordWriter writer = setupCsvServices(runner);
runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_SQL);
runner.setProperty(JoinEnrichment.SQL, "SELECT original.id, enrichment.${desired_enrichment_column} FROM original JOIN enrichment ON original.id = enrichment.id");
final Map<String, String> originalAttributes = new HashMap<>();
originalAttributes.put("enrichment.group.id", "abc");
originalAttributes.put("enrichment.role", "ORIGINAL");
runner.enqueue("id\n5", originalAttributes);
final Map<String, String> enrichmentAttributes = new HashMap<>();
enrichmentAttributes.put("enrichment.group.id", "abc");
enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
enrichmentAttributes.put("desired_enrichment_column", "name");
runner.enqueue("id,name\n5,John Doe", enrichmentAttributes);
runner.run();
runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1);
runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2);
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
final Record outRecord = written.get(0);
assertEquals(5, outRecord.getAsInt("id"));
assertEquals("John Doe", outRecord.getValue("name"));
final RecordSchema schema = outRecord.getSchema();
assertEquals(RecordFieldType.STRING, schema.getField("id").get().getDataType().getFieldType());
assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType());
}
// Tests that the LEFT OUTER JOIN example in the Additional Details works as expected
@Test
public void testLeftOuterJoin() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {