diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 30f73c1079..cfc07bbd81 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -62,6 +62,11 @@
commons-csv
1.4
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-csv
+ 2.9.2
+
commons-io
commons-io
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index a9b98f5721..9f133a6971 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -54,6 +54,26 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and assuming that all columns are of type String.");
+ // CSV parsers
+ public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV",
+ "The CSV parser implementation from the Apache Commons CSV library.");
+
+ public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV",
+ "The CSV parser implementation from the Jackson Dataformats library.");
+
+
+ public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder()
+ .name("csv-reader-csv-parser")
+ .displayName("CSV Parser")
+ .description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality "
+ + "and may also exhibit different levels of performance.")
+ .expressionLanguageSupported(false)
+ .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
+ .defaultValue(APACHE_COMMONS_CSV.getValue())
+ .required(true)
+ .build();
+
+ private volatile String csvParser;
private volatile CSVFormat csvFormat;
private volatile String dateFormat;
private volatile String timeFormat;
@@ -65,6 +85,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
@Override
protected List getSupportedPropertyDescriptors() {
final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(CSV_PARSER);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@@ -83,6 +104,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
@OnEnabled
public void storeCsvFormat(final ConfigurationContext context) {
+ this.csvParser = context.getProperty(CSV_PARSER).getValue();
this.csvFormat = CSVUtils.createCSVFormat(context);
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
@@ -108,7 +130,13 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset();
- return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+ if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
+ return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+ } else if(JACKSON_CSV.getValue().equals(csvParser)) {
+ return new JacksonCSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+ } else {
+ throw new IOException("Parser not supported");
+ }
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
new file mode 100644
index 0000000000..a273d0cbff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.io.input.BOMInputStream;
+import org.apache.commons.lang3.CharUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+
+public class JacksonCSVRecordReader implements RecordReader {
+ private final RecordSchema schema;
+
+ private final Supplier LAZY_DATE_FORMAT;
+ private final Supplier LAZY_TIME_FORMAT;
+ private final Supplier LAZY_TIMESTAMP_FORMAT;
+
+ private final ComponentLog logger;
+ private final boolean hasHeader;
+ private final boolean ignoreHeader;
+ private final MappingIterator recordStream;
+ private List rawFieldNames = null;
+
+ private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+ public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
+ final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
+
+ this.schema = schema;
+ this.logger = logger;
+ this.hasHeader = hasHeader;
+ this.ignoreHeader = ignoreHeader;
+ final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+ final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
+
+ LAZY_DATE_FORMAT = () -> df;
+ LAZY_TIME_FORMAT = () -> tf;
+ LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
+ final Reader reader = new InputStreamReader(new BOMInputStream(in));
+
+ CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
+ .setColumnSeparator(csvFormat.getDelimiter())
+ .setLineSeparator(csvFormat.getRecordSeparator())
+ // Can only use comments in Jackson CSV if the correct marker is set
+ .setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker())))
+ // The call to setUseHeader(false) in all code paths is due to the way Jackson does data binding/mapping. Missing or extra columns may not
+ // be handled correctly when using the header for mapping.
+ .setUseHeader(false);
+
+ csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter());
+ csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter());
+
+ if (hasHeader) {
+ if (ignoreHeader) {
+ csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true);
+ }
+ }
+
+ CsvSchema csvSchema = csvSchemaBuilder.build();
+
+ // Add remaining config options to the mapper
+ List features = new ArrayList<>(3);
+ features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS);
+ if (csvFormat.getIgnoreEmptyLines()) {
+ features.add(CsvParser.Feature.SKIP_EMPTY_LINES);
+ }
+ if (csvFormat.getTrim()) {
+ features.add(CsvParser.Feature.TRIM_SPACES);
+ }
+
+ ObjectReader objReader = mapper.readerFor(String[].class)
+ .with(csvSchema)
+ .withFeatures(features.toArray(new CsvParser.Feature[3]));
+
+ recordStream = objReader.readValues(reader);
+ }
+
+ @Override
+ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
+ final RecordSchema schema = getSchema();
+
+ if (recordStream.hasNext()) {
+ String[] csvRecord = recordStream.next();
+
+ // If the first record is the header names (and we're using them), store those off for use in creating the value map on the next iterations
+ if (rawFieldNames == null) {
+ if (!hasHeader || ignoreHeader) {
+ rawFieldNames = schema.getFieldNames();
+ } else {
+ rawFieldNames = Arrays.asList(csvRecord);
+
+ // Advance the stream to keep the record count correct
+ if (recordStream.hasNext()) {
+ csvRecord = recordStream.next();
+ } else {
+ return null;
+ }
+ }
+ }
+ // Check for empty lines and ignore them
+ boolean foundRecord = true;
+ if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) {
+ foundRecord = false;
+ while (recordStream.hasNext()) {
+ csvRecord = recordStream.next();
+
+ if (csvRecord != null && !(csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) {
+ // This is a non-empty record/row, so continue processing
+ foundRecord = true;
+ break;
+ }
+ }
+ }
+ // If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return
+ if (!foundRecord) {
+ return null;
+ }
+
+ final Map values = new LinkedHashMap<>();
+ final int numFieldNames = rawFieldNames.size();
+ for (int i = 0; i < csvRecord.length; i++) {
+ final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);
+ String rawValue = (i >= csvRecord.length) ? null : csvRecord[i];
+
+ final Optional dataTypeOption = schema.getDataType(rawFieldName);
+
+ if (!dataTypeOption.isPresent() && dropUnknownFields) {
+ continue;
+ }
+
+ final Object value;
+ if (coerceTypes && dataTypeOption.isPresent()) {
+ value = convert(rawValue, dataTypeOption.get(), rawFieldName);
+ } else if (dataTypeOption.isPresent()) {
+ // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
+ // dictate a field type. As a result, we will use the schema that we have to attempt to convert
+ // the value into the desired type if it's a simple type.
+ value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName);
+ } else {
+ value = rawValue;
+ }
+
+ values.put(rawFieldName, value);
+ }
+
+ return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
+ }
+
+ return null;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ protected Object convert(final String value, final DataType dataType, final String fieldName) {
+ if (dataType == null || value == null) {
+ return value;
+ }
+
+ final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
+ if (trimmed.isEmpty()) {
+ return null;
+ }
+
+ return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ }
+
+ private Object convertSimpleIfPossible(final String value, final DataType dataType, final String fieldName) {
+ if (dataType == null || value == null) {
+ return value;
+ }
+
+ final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
+ if (trimmed.isEmpty()) {
+ return null;
+ }
+
+ switch (dataType.getFieldType()) {
+ case STRING:
+ return value;
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BYTE:
+ case CHAR:
+ case SHORT:
+ case TIME:
+ case TIMESTAMP:
+ case DATE:
+ if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
+ return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ } else {
+ return value;
+ }
+ }
+
+ return value;
+ }
+
+ @Override
+ public void close() throws IOException {
+ recordStream.close();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
new file mode 100644
index 0000000000..30c05c00ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITApacheCSVRecordReader {
+
+ private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+ private List getDefaultFields() {
+ final List fields = new ArrayList<>();
+ for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+ fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+ }
+ return fields;
+ }
+
+ @Test
+ public void testParserPerformance() throws IOException, MalformedRecordException {
+ // Generates about 130MB of data
+ final int NUM_LINES = 2500000;
+ StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
+ for (int i = 0; i < NUM_LINES; i++) {
+ sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
+ }
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+ final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ Record record;
+ int numRecords = 0;
+ while ((record = reader.nextRecord()) != null) {
+ assertNotNull(record);
+ numRecords++;
+ }
+ assertEquals(NUM_LINES, numRecords);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
new file mode 100644
index 0000000000..a3f1b378f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJacksonCSVRecordReader {
+
+ private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+ private List getDefaultFields() {
+ final List fields = new ArrayList<>();
+ for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+ fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+ }
+ return fields;
+ }
+
+ @Test
+ public void testParserPerformance() throws IOException, MalformedRecordException {
+ // Generates about 130MB of data
+ final int NUM_LINES = 2500000;
+ StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
+ for (int i = 0; i < NUM_LINES; i++) {
+ sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
+ }
+ final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+ try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+ final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ Record record;
+ int numRecords = 0;
+ while ((record = reader.nextRecord()) != null) {
+ assertNotNull(record);
+ numRecords++;
+ }
+ assertEquals(NUM_LINES, numRecords);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
new file mode 100644
index 0000000000..9e085949bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestJacksonCSVRecordReader {
+ private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
+ private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+ private List getDefaultFields() {
+ final List fields = new ArrayList<>();
+ for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+ fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+ }
+ return fields;
+ }
+
+ private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException {
+ return new JacksonCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII");
+ }
+
+ @Test
+ public void testUTF8() throws IOException, MalformedRecordException {
+ final String text = "name\n黃凱揚";
+
+ final List fields = new ArrayList<>();
+ fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+ final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ final Record record = reader.nextRecord();
+ final String name = (String)record.getValue("name");
+
+ assertEquals("黃凱揚", name);
+ }
+ }
+
+ @Test
+ public void testDate() throws IOException, MalformedRecordException {
+ final String text = "date\n11/30/1983";
+
+ final List fields = new ArrayList<>();
+ fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+ final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ final Record record = reader.nextRecord();
+ final Date date = (Date) record.getValue("date");
+ final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+ calendar.setTimeInMillis(date.getTime());
+
+ assertEquals(1983, calendar.get(Calendar.YEAR));
+ assertEquals(10, calendar.get(Calendar.MONTH));
+ assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+ }
+ }
+
+ @Test
+ public void testSimpleParse() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
+ final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+ final Object[] record = reader.nextRecord().getValues();
+ final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+ Assert.assertArrayEquals(expectedValues, record);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ public void testMultipleRecords() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
+ final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+ final Object[] firstRecord = reader.nextRecord().getValues();
+ final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+ Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+ final Object[] secondRecord = reader.nextRecord().getValues();
+ final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+ Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
+ final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+ final Object[] firstRecord = reader.nextRecord().getValues();
+ final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+ Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+ final Object[] secondRecord = reader.nextRecord().getValues();
+ final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+ Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ public void testMissingField() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final String headerLine = "id, name, balance, address, city, state, zipCode, country";
+ final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111";
+ final String csvData = headerLine + "\n" + inputRecord;
+ final byte[] inputData = csvData.getBytes();
+
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+ final Record record = reader.nextRecord();
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals(40.8D, record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+ assertNull(record.getValue("country"));
+
+ assertNull(reader.nextRecord());
+ }
+ }
+
+ @Test
+ public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final String headerLine = "id, name, balance, address, city, state, zipCode, continent";
+ final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North America";
+ final String csvData = headerLine + "\n" + inputRecord;
+ final byte[] inputData = csvData.getBytes();
+
+ // test nextRecord does not contain a 'continent' field
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+ final Record record = reader.nextRecord();
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals("40.80", record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+ assertNull(record.getValue("country"));
+ assertNull(record.getValue("continent"));
+
+ assertNull(reader.nextRecord());
+ }
+
+ // test nextRawRecord does contain 'continent' field
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+ final Record record = reader.nextRecord(false, false);
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals("40.80", record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+ assertNull(record.getValue("country"));
+ assertEquals("North America", record.getValue("continent"));
+
+ assertNull(reader.nextRecord(false, false));
+ }
+ }
+
+
+ @Test
+ public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final String headerLine = "id, name, balance, address, city, state, zipCode";
+ final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA";
+ final String csvData = headerLine + "\n" + inputRecord;
+ final byte[] inputData = csvData.getBytes();
+
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+ final Record record = reader.nextRecord();
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals("40.80", record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+
+ // If schema says that there are fields a, b, c
+ // and the CSV has a header line that says field names are a, b
+ // and then the data has values 1,2,3
+ // then a=1, b=2, c=null
+ assertNull(record.getValue("country"));
+
+ assertNull(reader.nextRecord());
+ }
+
+ // Create another Record Reader that indicates that the header line is present but should be ignored. This should cause
+ // our schema to be the definitive list of what fields exist.
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+ final Record record = reader.nextRecord();
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals("40.80", record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+
+ // If schema says that there are fields a, b, c
+ // and the CSV has a header line that says field names are a, b
+ // and then the data has values 1,2,3
+ // then a=1, b=2, c=null
+ // But if we configure the reader to Ignore the header, then this will not occur!
+ assertEquals("USA", record.getValue("country"));
+
+ assertNull(reader.nextRecord());
+ }
+
+ }
+
+ @Test
+ public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException {
+ final List fields = getDefaultFields();
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final String headerLine = "id, name, balance, address, city, state, zipCode, country";
+ final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA, North America";
+ final String csvData = headerLine + "\n" + inputRecord;
+ final byte[] inputData = csvData.getBytes();
+
+ // test nextRecord does not contain a 'continent' field
+ try (final InputStream bais = new ByteArrayInputStream(inputData);
+ final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+ final Record record = reader.nextRecord(false, false);
+ assertNotNull(record);
+
+ assertEquals("1", record.getValue("id"));
+ assertEquals("John", record.getValue("name"));
+ assertEquals("40.80", record.getValue("balance"));
+ assertEquals("123 My Street", record.getValue("address"));
+ assertEquals("My City", record.getValue("city"));
+ assertEquals("MS", record.getValue("state"));
+ assertEquals("11111", record.getValue("zipCode"));
+ assertEquals("USA", record.getValue("country"));
+ assertEquals("North America", record.getValue("unknown_field_index_8"));
+
+ assertNull(reader.nextRecord(false, false));
+ }
+ }
+
+ @Test
+ public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException {
+
+ char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
+
+ final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
+ final List fields = getDefaultFields();
+ fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
+ final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+ final Object[] firstRecord = reader.nextRecord().getValues();
+ final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+ Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+ final Object[] secondRecord = reader.nextRecord().getValues();
+ final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+ Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+ assertNull(reader.nextRecord());
+ }
+ }
+}