NIFI-7734: Added Record Separator property to CSVReader

This commit is contained in:
Matthew Burgess 2020-08-12 17:01:15 -04:00 committed by markap14
parent 4417b9d64a
commit b7b2533ffe
3 changed files with 21 additions and 1 deletions

View File

@ -101,6 +101,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
properties.add(CSVUtils.CSV_FORMAT);
properties.add(CSVUtils.VALUE_SEPARATOR);
properties.add(CSVUtils.RECORD_SEPARATOR);
properties.add(CSVUtils.FIRST_LINE_IS_HEADER);
properties.add(CSVUtils.IGNORE_CSV_HEADER);
properties.add(CSVUtils.QUOTE_CHAR);

View File

@ -60,7 +60,7 @@ public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
.setColumnSeparator(csvFormat.getDelimiter())
.setLineSeparator(csvFormat.getRecordSeparator())
.setLineSeparator((csvFormat.getRecordSeparator() == null) ? "\n" : csvFormat.getRecordSeparator())
// Can only use comments in Jackson CSV if the correct marker is set
.setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker())))
// The call to setUseHeader(false) in all code paths is due to the way Jackson does data binding/mapping. Missing or extra columns may not

View File

@ -49,6 +49,7 @@ import static org.junit.Assert.assertNull;
public class TestJacksonCSVRecordReader {
private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
private final CSVFormat formatWithNullRecordSeparator = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withRecordSeparator(null);
private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
@ -391,4 +392,22 @@ public class TestJacksonCSVRecordReader {
assertNull(reader.nextRecord());
}
}
@Test
public void testNullRecordSeparator() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
final JacksonCSVRecordReader reader = createReader(fis, schema, formatWithNullRecordSeparator)) {
final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Assert.assertArrayEquals(expectedValues, record);
assertNull(reader.nextRecord());
}
}
}