NIFI-12023: Add FastCSV parser to CSVReader

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7685.
This commit is contained in:
Matt Burgess 2023-09-12 22:04:47 -04:00 committed by Pierre Villard
parent f27ace1ccf
commit a7a281c215
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
9 changed files with 733 additions and 2 deletions

View File

@ -151,6 +151,8 @@ public class CSVUtils {
"* Apache Commons CSV - duplicate headers will result in column data \"shifting\" right with new fields " +
"created for \"unknown_field_index_X\" where \"X\" is the CSV column index number\n" +
"* Jackson CSV - duplicate headers will be de-duplicated with the field value being that of the right-most " +
"duplicate CSV column\n" +
"* FastCSV - duplicate headers will be de-duplicated with the field value being that of the left-most " +
"duplicate CSV column")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")

View File

@ -75,6 +75,11 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
<dependency>
<groupId>de.siegmar</groupId>
<artifactId>fastcsv</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@ -183,8 +188,11 @@
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
<exclude>src/test/resources/csv/prov-events.csv</exclude>
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>

View File

@ -150,7 +150,7 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
return value;
}
private String trim(String value) {
protected String trim(String value) {
return (value.length() > 1) && value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
}

View File

@ -68,6 +68,12 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV",
"The CSV parser implementation from the Jackson Dataformats library.");
public static final AllowableValue FAST_CSV = new AllowableValue("fast-csv", "FastCSV",
"The CSV parser implementation from the FastCSV library. NOTE: This parser only officially supports RFC-4180, so it recommended to "
+ "set the 'CSV Format' property to 'RFC 4180'. It does handle some non-compliant CSV data, for that case set the 'CSV Format' property to "
+ "'CUSTOM' and the other custom format properties (such as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this "
+ "may cause errors if FastCSV doesn't handle the property settings correctly (such as 'Ignore Header'), but otherwise may process the input as expected even "
+ "if the data is not fully RFC-4180 compliant.");
public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder()
.name("csv-reader-csv-parser")
@ -75,7 +81,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
.description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality "
+ "and may also exhibit different levels of performance.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
.allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV, FAST_CSV)
.defaultValue(APACHE_COMMONS_CSV.getValue())
.required(true)
.build();
@ -175,6 +181,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else if (JACKSON_CSV.getValue().equals(csvParser)) {
return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else if (FAST_CSV.getValue().equals(csvParser)) {
return new FastCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else {
throw new IOException("Parser not supported");
}

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.csv;
import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvReader;
import de.siegmar.fastcsv.reader.CsvRow;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
public class FastCSVRecordReader extends AbstractCSVRecordReader {
private final CsvReader csvReader;
private final Iterator<CsvRow> csvRowIterator;
private List<RecordField> recordFields;
private Map<String, Integer> headerMap;
private final boolean ignoreHeader;
private final boolean trimDoubleQuote;
private final CSVFormat csvFormat;
public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote);
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
this.csvFormat = csvFormat;
CsvReader.CsvReaderBuilder builder = CsvReader.builder()
.fieldSeparator(csvFormat.getDelimiter())
.quoteCharacter(csvFormat.getQuoteCharacter())
.commentStrategy(CommentStrategy.SKIP)
.skipEmptyRows(csvFormat.getIgnoreEmptyLines())
.errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames());
if (csvFormat.getCommentMarker() != null) {
builder.commentCharacter(csvFormat.getCommentMarker());
}
if (hasHeader && !ignoreHeader) {
headerMap = null;
} else {
headerMap = new HashMap<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
headerMap.put(schema.getField(i).getFieldName(), i);
}
}
csvReader = builder.build(new InputStreamReader(in, encoding));
csvRowIterator = csvReader.iterator();
}
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
try {
final RecordSchema schema = getSchema();
final List<RecordField> recordFields = getRecordFields();
final int numFieldNames = recordFields.size();
if (!csvRowIterator.hasNext()) {
return null;
}
final CsvRow csvRecord = csvRowIterator.next();
final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
for (int i = 0; i < csvRecord.getFieldCount(); i++) {
String rawValue = csvRecord.getField(i);
if (csvFormat.getTrim()) {
rawValue = rawValue.trim();
}
if (trimDoubleQuote) {
rawValue = trim(rawValue);
}
final String rawFieldName;
final DataType dataType;
if (i >= numFieldNames) {
if (!dropUnknownFields) {
values.put("unknown_field_index_" + i, rawValue);
}
continue;
} else {
final RecordField recordField = recordFields.get(i);
rawFieldName = recordField.getFieldName();
dataType = recordField.getDataType();
}
final Object value;
if (coerceTypes) {
value = convert(rawValue, dataType, rawFieldName);
} else {
// The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
// dictate a field type. As a result, we will use the schema that we have to attempt to convert
// the value into the desired type if it's a simple type.
value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
}
values.putIfAbsent(rawFieldName, value);
}
return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
} catch (Exception e) {
throw new MalformedRecordException("Error while getting next record", e);
}
}
private List<RecordField> getRecordFields() {
if (this.recordFields != null) {
return this.recordFields;
}
if (ignoreHeader) {
logger.debug("With 'Ignore Header' set to true, FastCSV still reads the header and keeps track "
+ "of the number of fields in the header. This will cause an error if the provided schema does not "
+ "have the same number of fields, as this is not conformant to RFC-4180");
}
// When getting the field names from the first record, it has to be read in
if (!csvRowIterator.hasNext()) {
return Collections.emptyList();
}
CsvRow headerRow = csvRowIterator.next();
headerMap = new HashMap<>();
for (int i = 0; i < headerRow.getFieldCount(); i++) {
String rawValue = headerRow.getField(i);
if (csvFormat.getTrim()) {
rawValue = rawValue.trim();
}
if (this.trimDoubleQuote) {
rawValue = trim(rawValue);
}
headerMap.put(rawValue, i);
}
// Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
final SortedMap<Integer, String> sortedMap = new TreeMap<>();
for (final Map.Entry<String, Integer> entry : headerMap.entrySet()) {
sortedMap.put(entry.getValue(), entry.getKey());
}
final List<RecordField> fields = new ArrayList<>();
final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
for (final String rawFieldName : rawFieldNames) {
final Optional<RecordField> option = schema.getField(rawFieldName);
if (option.isPresent()) {
fields.add(option.get());
} else {
fields.add(new RecordField(rawFieldName, RecordFieldType.STRING.getDataType()));
}
}
this.recordFields = fields;
return fields;
}
@Override
public void close() throws IOException {
csvReader.close();
}
}

View File

@ -0,0 +1,506 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.csv;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
public class TestFastCSVRecordReader {
private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
private CSVFormat format;
@BeforeEach
public void setUp() {
format = CSVFormat.RFC4180;
}
private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
}
return fields;
}
private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException {
return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", false);
}
private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws IOException {
return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote);
}
@Test
public void testUTF8() throws IOException, MalformedRecordException {
final String text = "name\n黃凱揚";
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) {
final Record record = reader.nextRecord();
final String name = (String) record.getValue("name");
assertEquals("黃凱揚", name);
}
}
@Test
public void testISO8859() throws IOException, MalformedRecordException {
final String text = "name\nÄËÖÜ";
final byte[] bytesUTF = text.getBytes(StandardCharsets.UTF_8);
final byte[] bytes8859 = text.getBytes(StandardCharsets.ISO_8859_1);
assertEquals(13, bytesUTF.length, "expected size=13 for UTF-8 representation of test data");
assertEquals(9, bytes8859.length, "expected size=9 for ISO-8859-1 representation of test data");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.ISO_8859_1));
final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(),
StandardCharsets.ISO_8859_1.name(), true)) {
final Record record = reader.nextRecord();
final String name = (String) record.getValue("name");
assertEquals("ÄËÖÜ", name);
}
}
@Test
public void testDate() throws IOException, MalformedRecordException {
final String dateValue = "1983-11-30";
final String text = "date\n11/30/1983";
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) {
final Record record = reader.nextRecord();
final Object date = record.getValue("date");
assertEquals(java.sql.Date.valueOf(dateValue), date);
}
}
@Test
public void testSimpleParse() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account_RFC4180.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format)) {
final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
assertArrayEquals(expectedValues, record);
assertNull(reader.nextRecord());
}
}
@Test
public void testExcelFormat() throws IOException, MalformedRecordException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("fieldA", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("fieldB", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "fieldA,fieldB";
final String inputRecord = "valueA,valueB";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.EXCEL)) {
final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[]{"valueA", "valueB"};
assertArrayEquals(expectedValues, record);
assertNull(reader.nextRecord());
}
}
@Test
public void testMultipleRecords() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_RFC4180.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format)) {
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
assertArrayEquals(firstExpectedValues, firstRecord);
final Object[] secondRecord = reader.nextRecord().getValues();
final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
assertArrayEquals(secondExpectedValues, secondRecord);
assertNull(reader.nextRecord());
}
}
@Test
public void testMissingField() throws IOException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, format)) {
// RFC-4180 does not allow missing column names
assertThrows(MalformedRecordException.class, reader::nextRecord);
}
}
@Test
public void testMissingField_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
final String inputRecord = "1, John, 40.80, \"123 My Street\", My City, MS, 11111";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim().withAllowMissingColumnNames(), false)) {
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals(40.8D, record.getValue("balance"));
assertEquals("\"123 My Street\"", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertNull(reader.nextRecord());
}
}
@Test
public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id,name,balance,address,city,state,zipCode,continent";
final String inputRecord = "1,John,40.80,\"123 My Street\",My City,MS,11111,North America";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, format)) {
final Record record = reader.nextRecord(true, true);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertNull(record.getValue("continent"));
assertNull(reader.nextRecord());
}
// test nextRawRecord does contain 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, format)) {
final Record record = reader.nextRecord(false, false);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertEquals("North America", record.getValue("continent"));
assertNull(reader.nextRecord(false, false));
}
}
@Test
public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, continent";
final String inputRecord = "1, John, 40.80, \"123 My Street\", My City, MS, 11111, North America";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) {
final Record record = reader.nextRecord(true, true);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("\"123 My Street\"", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertNull(record.getValue("continent"));
assertNull(reader.nextRecord());
}
// test nextRawRecord does contain 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) {
final Record record = reader.nextRecord(false, false);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("\"123 My Street\"", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertEquals("North America", record.getValue("continent"));
assertNull(reader.nextRecord(false, false));
}
}
@Test
public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode";
final String inputRecord = "1, John, 40.80, \"\"123 My Street\"\", My City, MS, 11111, USA";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// Use a CUSTOM format as FastCSV sometimes handles non-compliant data such as trimming spaces from values
format = CSVFormat.RFC4180.builder()
.setTrim(true)
.setIgnoreSurroundingSpaces(true)
.setAllowMissingColumnNames(true)
.build();
// Create another Record Reader that indicates that the header line is present but should be ignored. This should cause
// our schema to be the definitive list of what fields exist.
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) {
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
// If schema says that there are fields a, b, c
// and the CSV has a header line that says field names are a, b
// and then the data has values 1,2,3
// then a=1, b=2, c=null
assertNull(record.getValue("country"));
assertNull(reader.nextRecord());
}
}
@Test
public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode";
final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111, USA";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) {
try {
final Record record = reader.nextRecord();
fail("Should have thrown MalformedRecordException");
} catch (MalformedRecordException mre) {
// Expected behavior
}
}
// Create another Record Reader that indicates that the header line is present but should be ignored. This should cause
// our schema to be the definitive list of what fields exist.
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, CSVFormat.RFC4180.withTrim(), true, true,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) {
// RFC-4180 does not allow missing column names
assertThrows(MalformedRecordException.class, reader::nextRecord);
}
}
@Test
public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111, USA, North America";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final FastCSVRecordReader reader = createReader(bais, schema, format)) {
// RFC-4180 does not allow missing column names
assertThrows(MalformedRecordException.class, () -> reader.nextRecord(false, false));
}
}
@Test
public void testMultipleRecordsDelimitedWithSpecialChar() throws IOException, MalformedRecordException {
char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_spec_delimiter.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format)) {
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
assertArrayEquals(firstExpectedValues, firstRecord);
final Object[] secondRecord = reader.nextRecord().getValues();
final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
assertArrayEquals(secondExpectedValues, secondRecord);
assertNull(reader.nextRecord());
}
}
@Test
public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRecordException {
final CSVFormat format = CSVFormat.DEFAULT.builder()
.setHeader()
.setSkipHeaderRecord(true)
.setTrim(true)
.setQuote('"')
.setDelimiter(",")
.setEscape(null)
.build();
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapechar.csv");
final FastCSVRecordReader reader = createReader(fis, schema, format, true)) {
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[]{"1", "John Doe\\", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
assertArrayEquals(firstExpectedValues, firstRecord);
final Object[] secondRecord = reader.nextRecord().getValues();
final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
assertArrayEquals(secondExpectedValues, secondRecord);
assertNull(reader.nextRecord());
}
}
}

View File

@ -0,0 +1,3 @@
id,name,balance,address,city,state,zipCode,country
1,John Doe,"4750.89",123 My Street,My City,MS,11111,USA
2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA
1 id name balance address city state zipCode country
2 1 John Doe 4750.89 123 My Street My City MS 11111 USA
3 2 Jane Doe 4820.09 321 Your Street Your City NY 33333 USA

View File

@ -0,0 +1,3 @@
id,name,balance,address,city,state,zipCode,country
1,John Doe\,"4750.89","123 My Street",My City,MS,11111,USA
2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA
1 id name balance address city state zipCode country
2 1 John Doe\ 4750.89 123 My Street My City MS 11111 USA
3 2 Jane Doe 4820.09 321 Your Street Your City NY 33333 USA

View File

@ -0,0 +1,2 @@
id,name,balance,address,city,state,zipCode,country
1,John Doe,4750.89,123 My Street,My City,MS,11111,USA
1 id name balance address city state zipCode country
2 1 John Doe 4750.89 123 My Street My City MS 11111 USA