NIFI-6082: Refactor the way to handle fields nullable

- Make enriched fields nullable at LookupRecord.
- Removed unnecessary AvroConversionOptions and reader schema creation,
because ResultSetRecordSet can generate NiFi Record Schema from RS
directly. No Avro schema is needed to do that.
This commit is contained in:
Koji Kawamura 2019-03-13 16:50:01 +09:00
parent ca76fe178c
commit 4db5446c87
4 changed files with 53 additions and 34 deletions

View File

@ -55,21 +55,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private static final String FLOAT_CLASS_NAME = Float.class.getName(); private static final String FLOAT_CLASS_NAME = Float.class.getName();
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
this(rs, readerSchema, false);
}
/**
* Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
*
* @param rs The underlying ResultSet for this RecordSet
* @param readerSchema The schema to which this RecordSet adheres
* @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
* @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
*/
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
this.rs = rs; this.rs = rs;
moreRows = rs.next(); moreRows = rs.next();
this.schema = createSchema(rs, readerSchema, allFieldsNullable); this.schema = createSchema(rs, readerSchema);
rsColumnNames = new HashSet<>(); rsColumnNames = new HashSet<>();
final ResultSetMetaData metadata = rs.getMetaData(); final ResultSetMetaData metadata = rs.getMetaData();
@ -152,7 +140,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return value; return value;
} }
private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException { private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData(); final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount(); final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols); final List<RecordField> fields = new ArrayList<>(numCols);
@ -166,7 +154,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final int nullableFlag = metadata.isNullable(column); final int nullableFlag = metadata.isNullable(column);
final boolean nullable; final boolean nullable;
if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) { if (nullableFlag == ResultSetMetaData.columnNoNulls) {
nullable = false; nullable = false;
} else { } else {
nullable = true; nullable = true;

View File

@ -316,7 +316,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) { if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
final Record lookupRecord = (Record) lookupValue; final Record lookupRecord = (Record) lookupValue;
// Use wants to add all fields of the resultant Record to the specified Record Path. // User wants to add all fields of the resultant Record to the specified Record Path.
// If the destination Record Path returns to us a Record, then we will add all field values of // If the destination Record Path returns to us a Record, then we will add all field values of
// the Lookup Record to the destination Record. However, if the destination Record Path returns // the Lookup Record to the destination Record. However, if the destination Record Path returns
// something other than a Record, then we can't add the fields to it. We can only replace it, // something other than a Record, then we can't add the fields to it. We can only replace it,
@ -332,7 +332,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName); final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
if (recordFieldOption.isPresent()) { if (recordFieldOption.isPresent()) {
destinationRecord.setValue(recordFieldOption.get(), value); // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
// and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
// then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
RecordField field = recordFieldOption.get();
if (!routeToMatchedUnmatched && !field.isNullable()) {
field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
}
destinationRecord.setValue(field, value);
} else { } else {
destinationRecord.setValue(fieldName, value); destinationRecord.setValue(fieldName, value);
} }

View File

@ -395,6 +395,46 @@ public class TestLookupRecord {
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
} }
@Test
public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
// Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable.
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
// Incoming Record doesn't have the fields to be enriched.
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addRecord("John Doe", 48);
recordReader.addRecord("Jane Doe", 47);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
}
private static class MapLookup extends AbstractControllerService implements StringLookupService { private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>(); private final Map<String, String> values = new HashMap<>();

View File

@ -19,12 +19,10 @@ package org.apache.nifi.lookup.db;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.Expiry;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext;
@ -34,10 +32,8 @@ import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.db.JdbcCommon;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
@ -62,7 +58,6 @@ import java.util.stream.Stream;
public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService { public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
private volatile Cache<Tuple<String, Object>, Record> cache; private volatile Cache<Tuple<String, Object>, Record> cache;
private volatile JdbcCommon.AvroConversionOptions options;
static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder() static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
.name("dbrecord-lookup-value-columns") .name("dbrecord-lookup-value-columns")
@ -120,15 +115,6 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
.build(); .build();
} }
} }
options = JdbcCommon.AvroConversionOptions.builder()
.recordName("NiFi_DB_Record_Lookup")
// Ignore duplicates
.maxRows(1)
// Keep column names as field names
.convertNames(false)
.useLogicalTypes(true)
.build();
} }
@Override @Override
@ -173,9 +159,7 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
st.setObject(1, key); st.setObject(1, key);
ResultSet resultSet = st.executeQuery(); ResultSet resultSet = st.executeQuery();
final Schema avroSchema = JdbcCommon.createSchema(resultSet, options); ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null);
final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
foundRecord = resultSetRecordSet.next(); foundRecord = resultSetRecordSet.next();
// Populate the cache if the record is present // Populate the cache if the record is present