From 4db5446c878a9be1d621429686f2835dc642d550 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 13 Mar 2019 16:50:01 +0900 Subject: [PATCH] NIFI-6082: Refactor the way to handle fields nullable - Make enriched fields nullable at LookupRecord. - Removed unnecessary AvroConversionOptions and reader schema creation, because ResultSetRecordSet can generate NiFi Record Schema from RS directly. No Avro schema is needed to do that. --- .../record/ResultSetRecordSet.java | 18 ++------- .../processors/standard/LookupRecord.java | 11 ++++- .../processors/standard/TestLookupRecord.java | 40 +++++++++++++++++++ .../db/DatabaseRecordLookupService.java | 18 +-------- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index fc3d60fbfc..ee47c63ea5 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -55,21 +55,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable { private static final String FLOAT_CLASS_NAME = Float.class.getName(); public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { - this(rs, readerSchema, false); - } - - /** - * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema - * - * @param rs The underlying ResultSet for this RecordSet - * @param readerSchema The schema to which this RecordSet adheres - * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable. - * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata - */ - public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException { this.rs = rs; moreRows = rs.next(); - this.schema = createSchema(rs, readerSchema, allFieldsNullable); + this.schema = createSchema(rs, readerSchema); rsColumnNames = new HashSet<>(); final ResultSetMetaData metadata = rs.getMetaData(); @@ -152,7 +140,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return value; } - private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException { + private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { final ResultSetMetaData metadata = rs.getMetaData(); final int numCols = metadata.getColumnCount(); final List fields = new ArrayList<>(numCols); @@ -166,7 +154,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final int nullableFlag = metadata.isNullable(column); final boolean nullable; - if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) { + if (nullableFlag == ResultSetMetaData.columnNoNulls) { nullable = false; } else { nullable = true; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 96a8d3e556..23d132536b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -316,7 +316,7 @@ public class LookupRecord extends AbstractRouteRecord recordFieldOption = lookupRecord.getSchema().getField(fieldName); if (recordFieldOption.isPresent()) { - destinationRecord.setValue(recordFieldOption.get(), value); + // Even if the looked up field is not nullable, if the lookup key didn't match with any record, + // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship, + // then enriched fields should be nullable to support unmatched records whose enriched fields will be null. + RecordField field = recordFieldOption.get(); + if (!routeToMatchedUnmatched && !field.isNullable()) { + field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true); + } + destinationRecord.setValue(field, value); } else { destinationRecord.setValue(fieldName, value); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 3efd9d1f4f..f8fb158004 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -395,6 +395,46 @@ public class TestLookupRecord { || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); } + @Test + public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS); + + // Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable. + final List fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false)); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true)); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap<>()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + // Incoming Record doesn't have the fields to be enriched. + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + + recordReader.addRecord("John Doe", 48); + recordReader.addRecord("Jane Doe", 47); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0); + out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n"); + } private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map values = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java index fdb145223f..b176b330ba 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java @@ -19,12 +19,10 @@ package org.apache.nifi.lookup.db; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; -import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -34,10 +32,8 @@ import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.util.Tuple; -import org.apache.nifi.util.db.JdbcCommon; import java.io.IOException; import java.sql.Connection; @@ -62,7 +58,6 @@ import java.util.stream.Stream; public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService { private volatile Cache, Record> cache; - private volatile JdbcCommon.AvroConversionOptions options; static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder() .name("dbrecord-lookup-value-columns") @@ -120,15 +115,6 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i .build(); } } - - options = JdbcCommon.AvroConversionOptions.builder() - .recordName("NiFi_DB_Record_Lookup") - // Ignore duplicates - .maxRows(1) - // Keep column names as field names - .convertNames(false) - .useLogicalTypes(true) - .build(); } @Override @@ -173,9 +159,7 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i st.setObject(1, key); ResultSet resultSet = st.executeQuery(); - final Schema avroSchema = JdbcCommon.createSchema(resultSet, options); - final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema); - ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true); + ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null); foundRecord = resultSetRecordSet.next(); // Populate the cache if the record is present