From a7a281c215b2ca2c172981159cb7ca9399ba6486 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 12 Sep 2023 22:04:47 -0400 Subject: [PATCH] NIFI-12023: Add FastCSV parser to CSVReader Signed-off-by: Pierre Villard This closes #7685. --- .../java/org/apache/nifi/csv/CSVUtils.java | 2 + .../pom.xml | 8 + .../nifi/csv/AbstractCSVRecordReader.java | 2 +- .../java/org/apache/nifi/csv/CSVReader.java | 10 +- .../apache/nifi/csv/FastCSVRecordReader.java | 199 +++++++ .../nifi/csv/TestFastCSVRecordReader.java | 506 ++++++++++++++++++ .../csv/multi-bank-account_RFC4180.csv | 3 + .../multi-bank-account_escapechar_RFC4180.csv | 3 + .../csv/single-bank-account_RFC4180.csv | 2 + 9 files changed, 733 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java index 50efc8dc8b..cd3b16a3f1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -151,6 +151,8 @@ public class CSVUtils { "* Apache Commons CSV - duplicate headers will result in column data \"shifting\" right with new fields " + "created for \"unknown_field_index_X\" where \"X\" is the CSV column index number\n" + "* Jackson CSV - duplicate headers will be de-duplicated with the field value being that of the right-most " + + "duplicate CSV column\n" + + "* FastCSV - duplicate headers will be de-duplicated with the field value being that of the left-most " + "duplicate CSV column") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index c636c23f63..5bc6e4e968 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -75,6 +75,11 @@ com.fasterxml.jackson.dataformat jackson-dataformat-csv + + de.siegmar + fastcsv + 2.2.2 + commons-io commons-io @@ -183,8 +188,11 @@ src/test/resources/csv/extra-white-space.csv src/test/resources/csv/multi-bank-account.csv + src/test/resources/csv/multi-bank-account_RFC4180.csv src/test/resources/csv/single-bank-account.csv + src/test/resources/csv/single-bank-account_RFC4180.csv src/test/resources/csv/multi-bank-account_escapechar.csv + src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv src/test/resources/csv/multi-bank-account_spec_delimiter.csv src/test/resources/csv/prov-events.csv src/test/resources/grok/error-with-stack-trace.log diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java index 4174e9d476..6a2651bd9e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java @@ -150,7 +150,7 @@ abstract public class AbstractCSVRecordReader implements RecordReader { return value; } - private String trim(String value) { + protected String trim(String value) { return (value.length() > 1) && value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index 56ad3b6331..9974b07f39 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -68,6 +68,12 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV", "The CSV parser implementation from the Jackson Dataformats library."); + public static final AllowableValue FAST_CSV = new AllowableValue("fast-csv", "FastCSV", + "The CSV parser implementation from the FastCSV library. NOTE: This parser only officially supports RFC-4180, so it recommended to " + + "set the 'CSV Format' property to 'RFC 4180'. It does handle some non-compliant CSV data, for that case set the 'CSV Format' property to " + + "'CUSTOM' and the other custom format properties (such as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this " + + "may cause errors if FastCSV doesn't handle the property settings correctly (such as 'Ignore Header'), but otherwise may process the input as expected even " + + "if the data is not fully RFC-4180 compliant."); public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder() .name("csv-reader-csv-parser") @@ -75,7 +81,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact .description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality " + "and may also exhibit different levels of performance.") .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV) + .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV, FAST_CSV) .defaultValue(APACHE_COMMONS_CSV.getValue()) .required(true) .build(); @@ -175,6 +181,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote); } else if (JACKSON_CSV.getValue().equals(csvParser)) { return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote); + } else if (FAST_CSV.getValue().equals(csvParser)) { + return new FastCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote); } else { throw new IOException("Parser not supported"); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java new file mode 100644 index 0000000000..649e62f5b9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import de.siegmar.fastcsv.reader.CommentStrategy; +import de.siegmar.fastcsv.reader.CsvReader; +import de.siegmar.fastcsv.reader.CsvRow; +import org.apache.commons.csv.CSVFormat; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; + +public class FastCSVRecordReader extends AbstractCSVRecordReader { + private final CsvReader csvReader; + private final Iterator csvRowIterator; + + private List recordFields; + + private Map headerMap; + + private final boolean ignoreHeader; + private final boolean trimDoubleQuote; + private final CSVFormat csvFormat; + + public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException { + super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote); + this.ignoreHeader = ignoreHeader; + this.trimDoubleQuote = trimDoubleQuote; + this.csvFormat = csvFormat; + + CsvReader.CsvReaderBuilder builder = CsvReader.builder() + .fieldSeparator(csvFormat.getDelimiter()) + .quoteCharacter(csvFormat.getQuoteCharacter()) + .commentStrategy(CommentStrategy.SKIP) + .skipEmptyRows(csvFormat.getIgnoreEmptyLines()) + .errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames()); + + if (csvFormat.getCommentMarker() != null) { + builder.commentCharacter(csvFormat.getCommentMarker()); + } + + if (hasHeader && !ignoreHeader) { + headerMap = null; + } else { + headerMap = new HashMap<>(); + for (int i = 0; i < schema.getFieldCount(); i++) { + headerMap.put(schema.getField(i).getFieldName(), i); + } + } + + csvReader = builder.build(new InputStreamReader(in, encoding)); + csvRowIterator = csvReader.iterator(); + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + + try { + final RecordSchema schema = getSchema(); + + final List recordFields = getRecordFields(); + final int numFieldNames = recordFields.size(); + if (!csvRowIterator.hasNext()) { + return null; + } + final CsvRow csvRecord = csvRowIterator.next(); + final Map values = new LinkedHashMap<>(recordFields.size() * 2); + for (int i = 0; i < csvRecord.getFieldCount(); i++) { + String rawValue = csvRecord.getField(i); + if (csvFormat.getTrim()) { + rawValue = rawValue.trim(); + } + if (trimDoubleQuote) { + rawValue = trim(rawValue); + } + + final String rawFieldName; + final DataType dataType; + if (i >= numFieldNames) { + if (!dropUnknownFields) { + values.put("unknown_field_index_" + i, rawValue); + } + continue; + } else { + final RecordField recordField = recordFields.get(i); + rawFieldName = recordField.getFieldName(); + dataType = recordField.getDataType(); + } + + final Object value; + if (coerceTypes) { + value = convert(rawValue, dataType, rawFieldName); + } else { + // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to + // dictate a field type. As a result, we will use the schema that we have to attempt to convert + // the value into the desired type if it's a simple type. + value = convertSimpleIfPossible(rawValue, dataType, rawFieldName); + } + + values.putIfAbsent(rawFieldName, value); + } + + return new MapRecord(schema, values, coerceTypes, dropUnknownFields); + } catch (Exception e) { + throw new MalformedRecordException("Error while getting next record", e); + } + } + + + private List getRecordFields() { + if (this.recordFields != null) { + return this.recordFields; + } + + if (ignoreHeader) { + logger.debug("With 'Ignore Header' set to true, FastCSV still reads the header and keeps track " + + "of the number of fields in the header. This will cause an error if the provided schema does not " + + "have the same number of fields, as this is not conformant to RFC-4180"); + } + + // When getting the field names from the first record, it has to be read in + if (!csvRowIterator.hasNext()) { + return Collections.emptyList(); + } + CsvRow headerRow = csvRowIterator.next(); + headerMap = new HashMap<>(); + for (int i = 0; i < headerRow.getFieldCount(); i++) { + String rawValue = headerRow.getField(i); + if (csvFormat.getTrim()) { + rawValue = rawValue.trim(); + } + if (this.trimDoubleQuote) { + rawValue = trim(rawValue); + } + headerMap.put(rawValue, i); + } + + + // Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order + final SortedMap sortedMap = new TreeMap<>(); + for (final Map.Entry entry : headerMap.entrySet()) { + sortedMap.put(entry.getValue(), entry.getKey()); + } + + final List fields = new ArrayList<>(); + final List rawFieldNames = new ArrayList<>(sortedMap.values()); + for (final String rawFieldName : rawFieldNames) { + final Optional option = schema.getField(rawFieldName); + if (option.isPresent()) { + fields.add(option.get()); + } else { + fields.add(new RecordField(rawFieldName, RecordFieldType.STRING.getDataType())); + } + } + + this.recordFields = fields; + return fields; + } + + @Override + public void close() throws IOException { + csvReader.close(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java new file mode 100644 index 0000000000..929113eaf8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestFastCSVRecordReader { + private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType(); + private CSVFormat format; + + @BeforeEach + public void setUp() { + format = CSVFormat.RFC4180; + } + + private List getDefaultFields() { + final List fields = new ArrayList<>(); + for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return fields; + } + + private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException { + return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", false); + } + + private FastCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws IOException { + return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote); + } + + @Test + public void testUTF8() throws IOException, MalformedRecordException { + final String text = "name\n黃凱揚"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8)); + final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) { + + final Record record = reader.nextRecord(); + final String name = (String) record.getValue("name"); + + assertEquals("黃凱揚", name); + } + } + + @Test + public void testISO8859() throws IOException, MalformedRecordException { + final String text = "name\nÄËÖÜ"; + final byte[] bytesUTF = text.getBytes(StandardCharsets.UTF_8); + final byte[] bytes8859 = text.getBytes(StandardCharsets.ISO_8859_1); + assertEquals(13, bytesUTF.length, "expected size=13 for UTF-8 representation of test data"); + assertEquals(9, bytes8859.length, "expected size=9 for ISO-8859-1 representation of test data"); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.ISO_8859_1)); + final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), + StandardCharsets.ISO_8859_1.name(), true)) { + + final Record record = reader.nextRecord(); + final String name = (String) record.getValue("name"); + + assertEquals("ÄËÖÜ", name); + } + } + + @Test + public void testDate() throws IOException, MalformedRecordException { + final String dateValue = "1983-11-30"; + final String text = "date\n11/30/1983"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) { + + final Record record = reader.nextRecord(); + final Object date = record.getValue("date"); + assertEquals(java.sql.Date.valueOf(dateValue), date); + } + } + + @Test + public void testSimpleParse() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account_RFC4180.csv"); + final FastCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] record = reader.nextRecord().getValues(); + final Object[] expectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(expectedValues, record); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testExcelFormat() throws IOException, MalformedRecordException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("fieldA", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("fieldB", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "fieldA,fieldB"; + final String inputRecord = "valueA,valueB"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.EXCEL)) { + + final Object[] record = reader.nextRecord().getValues(); + final Object[] expectedValues = new Object[]{"valueA", "valueB"}; + assertArrayEquals(expectedValues, record); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testMultipleRecords() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_RFC4180.csv"); + final FastCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testMissingField() throws IOException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, format)) { + + // RFC-4180 does not allow missing column names + assertThrows(MalformedRecordException.class, reader::nextRecord); + } + } + + @Test + public void testMissingField_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, \"123 My Street\", My City, MS, 11111"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim().withAllowMissingColumnNames(), false)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals(40.8D, record.getValue("balance")); + assertEquals("\"123 My Street\"", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id,name,balance,address,city,state,zipCode,continent"; + final String inputRecord = "1,John,40.80,\"123 My Street\",My City,MS,11111,North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(true, true); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertNull(record.getValue("continent")); + + assertNull(reader.nextRecord()); + } + + // test nextRawRecord does contain 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertEquals("North America", record.getValue("continent")); + + assertNull(reader.nextRecord(false, false)); + } + } + + @Test + public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, continent"; + final String inputRecord = "1, John, 40.80, \"123 My Street\", My City, MS, 11111, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) { + + final Record record = reader.nextRecord(true, true); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("\"123 My Street\"", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertNull(record.getValue("continent")); + + assertNull(reader.nextRecord()); + } + + // test nextRawRecord does contain 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("\"123 My Street\"", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertEquals("North America", record.getValue("continent")); + + assertNull(reader.nextRecord(false, false)); + } + } + + + @Test + public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode"; + final String inputRecord = "1, John, 40.80, \"\"123 My Street\"\", My City, MS, 11111, USA"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // Use a CUSTOM format as FastCSV sometimes handles non-compliant data such as trimming spaces from values + format = CSVFormat.RFC4180.builder() + .setTrim(true) + .setIgnoreSurroundingSpaces(true) + .setAllowMissingColumnNames(true) + .build(); + + // Create another Record Reader that indicates that the header line is present but should be ignored. This should cause + // our schema to be the definitive list of what fields exist. + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + + // If schema says that there are fields a, b, c + // and the CSV has a header line that says field names are a, b + // and then the data has values 1,2,3 + // then a=1, b=2, c=null + assertNull(record.getValue("country")); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode"; + final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111, USA"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, CSVFormat.RFC4180.withTrim(), false)) { + + try { + final Record record = reader.nextRecord(); + fail("Should have thrown MalformedRecordException"); + } catch (MalformedRecordException mre) { + // Expected behavior + } + } + + // Create another Record Reader that indicates that the header line is present but should be ignored. This should cause + // our schema to be the definitive list of what fields exist. + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = new FastCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, CSVFormat.RFC4180.withTrim(), true, true, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) { + + // RFC-4180 does not allow missing column names + assertThrows(MalformedRecordException.class, reader::nextRecord); + } + } + + @Test + public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", My City, MS, 11111, USA, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final FastCSVRecordReader reader = createReader(bais, schema, format)) { + + // RFC-4180 does not allow missing column names + assertThrows(MalformedRecordException.class, () -> reader.nextRecord(false, false)); + } + } + + @Test + public void testMultipleRecordsDelimitedWithSpecialChar() throws IOException, MalformedRecordException { + + char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0); + + final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter); + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_spec_delimiter.csv"); + final FastCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testMultipleRecordsEscapedWithNull() throws IOException, MalformedRecordException { + + final CSVFormat format = CSVFormat.DEFAULT.builder() + .setHeader() + .setSkipHeaderRecord(true) + .setTrim(true) + .setQuote('"') + .setDelimiter(",") + .setEscape(null) + .build(); + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapechar.csv"); + final FastCSVRecordReader reader = createReader(fis, schema, format, true)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[]{"1", "John Doe\\", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[]{"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv new file mode 100644 index 0000000000..7800751e61 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv @@ -0,0 +1,3 @@ +id,name,balance,address,city,state,zipCode,country +1,John Doe,"4750.89",123 My Street,My City,MS,11111,USA +2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv new file mode 100644 index 0000000000..1e6c6effef --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv @@ -0,0 +1,3 @@ +id,name,balance,address,city,state,zipCode,country +1,John Doe\,"4750.89","123 My Street",My City,MS,11111,USA +2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv new file mode 100644 index 0000000000..d0489e07e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv @@ -0,0 +1,2 @@ +id,name,balance,address,city,state,zipCode,country +1,John Doe,4750.89,123 My Street,My City,MS,11111,USA \ No newline at end of file