NIFI-3910: Fixed issue where CSVRecordReader throws an Exception if a field is missing instead of using a null value

This closes #1807.
This commit is contained in:
Mark Payne 2017-05-16 11:09:14 -04:00
parent 824712bffe
commit 3f4b276b71
2 changed files with 40 additions and 5 deletions

View File

@ -68,17 +68,19 @@ public class CSVRecordReader implements RecordReader {
final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount());
for (final RecordField recordField : schema.getFields()) {
String rawValue = csvRecord.get(recordField.getFieldName());
if (rawValue == null) {
String rawValue = null;
final String fieldName = recordField.getFieldName();
if (csvRecord.isSet(fieldName)) {
rawValue = csvRecord.get(fieldName);
} else {
for (final String alias : recordField.getAliases()) {
rawValue = csvRecord.get(alias);
if (rawValue != null) {
if (csvRecord.isSet(alias)) {
rawValue = csvRecord.get(alias);
break;
}
}
}
final String fieldName = recordField.getFieldName();
if (rawValue == null) {
rowValues.put(fieldName, null);
continue;

View File

@ -18,6 +18,7 @@
package org.apache.nifi.csv;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.ByteArrayInputStream;
@ -143,4 +144,36 @@ public class TestCSVRecordReader {
assertNull(reader.nextRecord());
}
}
@Test
public void testMissingField() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream baos = new ByteArrayInputStream(inputData)) {
final CSVRecordReader reader = new CSVRecordReader(baos, Mockito.mock(ComponentLog.class), schema, format,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals(40.8D, record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertNull(reader.nextRecord());
}
}
}