From 14d2291db87d8ea160f538c10de31ac69fc996ae Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 1 Nov 2017 11:50:06 -0400 Subject: [PATCH] NIFI-4496: Added JacksonCSVRecordReader to allow choice of CSV parser. This closes #2245. --- .../pom.xml | 5 + .../java/org/apache/nifi/csv/CSVReader.java | 30 +- .../nifi/csv/JacksonCSVRecordReader.java | 251 ++++++++++++ .../nifi/csv/ITApacheCSVRecordReader.java | 74 ++++ .../nifi/csv/ITJacksonCSVRecordReader.java | 74 ++++ .../nifi/csv/TestJacksonCSVRecordReader.java | 371 ++++++++++++++++++ 6 files changed, 804 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 30f73c1079..cfc07bbd81 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -62,6 +62,11 @@ commons-csv 1.4 + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + 2.9.2 + commons-io commons-io diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index a9b98f5721..9f133a6971 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -54,6 +54,26 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact "The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the " + "column names in the header and assuming that all columns are of type String."); + // CSV parsers + public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV", + "The CSV parser implementation from the Apache Commons CSV library."); + + public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV", + "The CSV parser implementation from the Jackson Dataformats library."); + + + public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder() + .name("csv-reader-csv-parser") + .displayName("CSV Parser") + .description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality " + + "and may also exhibit different levels of performance.") + .expressionLanguageSupported(false) + .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV) + .defaultValue(APACHE_COMMONS_CSV.getValue()) + .required(true) + .build(); + + private volatile String csvParser; private volatile CSVFormat csvFormat; private volatile String dateFormat; private volatile String timeFormat; @@ -65,6 +85,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(CSV_PARSER); properties.add(DateTimeUtils.DATE_FORMAT); properties.add(DateTimeUtils.TIME_FORMAT); properties.add(DateTimeUtils.TIMESTAMP_FORMAT); @@ -83,6 +104,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact @OnEnabled public void storeCsvFormat(final ConfigurationContext context) { + this.csvParser = context.getProperty(CSV_PARSER).getValue(); this.csvFormat = CSVUtils.createCSVFormat(context); this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); @@ -108,7 +130,13 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null); bufferedIn.reset(); - return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet); + if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) { + return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet); + } else if(JACKSON_CSV.getValue().equals(csvParser)) { + return new JacksonCSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet); + } else { + throw new IOException("Parser not supported"); + } } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java new file mode 100644 index 0000000000..a273d0cbff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.io.input.BOMInputStream; +import org.apache.commons.lang3.CharUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + + +public class JacksonCSVRecordReader implements RecordReader { + private final RecordSchema schema; + + private final Supplier LAZY_DATE_FORMAT; + private final Supplier LAZY_TIME_FORMAT; + private final Supplier LAZY_TIMESTAMP_FORMAT; + + private final ComponentLog logger; + private final boolean hasHeader; + private final boolean ignoreHeader; + private final MappingIterator recordStream; + private List rawFieldNames = null; + + private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY); + + public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { + + this.schema = schema; + this.logger = logger; + this.hasHeader = hasHeader; + this.ignoreHeader = ignoreHeader; + final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); + + LAZY_DATE_FORMAT = () -> df; + LAZY_TIME_FORMAT = () -> tf; + LAZY_TIMESTAMP_FORMAT = () -> tsf; + + final Reader reader = new InputStreamReader(new BOMInputStream(in)); + + CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder() + .setColumnSeparator(csvFormat.getDelimiter()) + .setLineSeparator(csvFormat.getRecordSeparator()) + // Can only use comments in Jackson CSV if the correct marker is set + .setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker()))) + // The call to setUseHeader(false) in all code paths is due to the way Jackson does data binding/mapping. Missing or extra columns may not + // be handled correctly when using the header for mapping. + .setUseHeader(false); + + csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter()); + csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter()); + + if (hasHeader) { + if (ignoreHeader) { + csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true); + } + } + + CsvSchema csvSchema = csvSchemaBuilder.build(); + + // Add remaining config options to the mapper + List features = new ArrayList<>(3); + features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS); + if (csvFormat.getIgnoreEmptyLines()) { + features.add(CsvParser.Feature.SKIP_EMPTY_LINES); + } + if (csvFormat.getTrim()) { + features.add(CsvParser.Feature.TRIM_SPACES); + } + + ObjectReader objReader = mapper.readerFor(String[].class) + .with(csvSchema) + .withFeatures(features.toArray(new CsvParser.Feature[3])); + + recordStream = objReader.readValues(reader); + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + final RecordSchema schema = getSchema(); + + if (recordStream.hasNext()) { + String[] csvRecord = recordStream.next(); + + // If the first record is the header names (and we're using them), store those off for use in creating the value map on the next iterations + if (rawFieldNames == null) { + if (!hasHeader || ignoreHeader) { + rawFieldNames = schema.getFieldNames(); + } else { + rawFieldNames = Arrays.asList(csvRecord); + + // Advance the stream to keep the record count correct + if (recordStream.hasNext()) { + csvRecord = recordStream.next(); + } else { + return null; + } + } + } + // Check for empty lines and ignore them + boolean foundRecord = true; + if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) { + foundRecord = false; + while (recordStream.hasNext()) { + csvRecord = recordStream.next(); + + if (csvRecord != null && !(csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) { + // This is a non-empty record/row, so continue processing + foundRecord = true; + break; + } + } + } + // If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return + if (!foundRecord) { + return null; + } + + final Map values = new LinkedHashMap<>(); + final int numFieldNames = rawFieldNames.size(); + for (int i = 0; i < csvRecord.length; i++) { + final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i); + String rawValue = (i >= csvRecord.length) ? null : csvRecord[i]; + + final Optional dataTypeOption = schema.getDataType(rawFieldName); + + if (!dataTypeOption.isPresent() && dropUnknownFields) { + continue; + } + + final Object value; + if (coerceTypes && dataTypeOption.isPresent()) { + value = convert(rawValue, dataTypeOption.get(), rawFieldName); + } else if (dataTypeOption.isPresent()) { + // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to + // dictate a field type. As a result, we will use the schema that we have to attempt to convert + // the value into the desired type if it's a simple type. + value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName); + } else { + value = rawValue; + } + + values.put(rawFieldName, value); + } + + return new MapRecord(schema, values, coerceTypes, dropUnknownFields); + } + + return null; + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + protected Object convert(final String value, final DataType dataType, final String fieldName) { + if (dataType == null || value == null) { + return value; + } + + final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value; + if (trimmed.isEmpty()) { + return null; + } + + return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + + private Object convertSimpleIfPossible(final String value, final DataType dataType, final String fieldName) { + if (dataType == null || value == null) { + return value; + } + + final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value; + if (trimmed.isEmpty()) { + return null; + } + + switch (dataType.getFieldType()) { + case STRING: + return value; + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BYTE: + case CHAR: + case SHORT: + case TIME: + case TIMESTAMP: + case DATE: + if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) { + return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } else { + return value; + } + } + + return value; + } + + @Override + public void close() throws IOException { + recordStream.close(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java new file mode 100644 index 0000000000..30c05c00ec --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ITApacheCSVRecordReader { + + private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"'); + + private List getDefaultFields() { + final List fields = new ArrayList<>(); + for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return fields; + } + + @Test + public void testParserPerformance() throws IOException, MalformedRecordException { + // Generates about 130MB of data + final int NUM_LINES = 2500000; + StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n"); + for (int i = 0; i < NUM_LINES; i++) { + sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n"); + } + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes()); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + Record record; + int numRecords = 0; + while ((record = reader.nextRecord()) != null) { + assertNotNull(record); + numRecords++; + } + assertEquals(NUM_LINES, numRecords); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java new file mode 100644 index 0000000000..a3f1b378f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ITJacksonCSVRecordReader { + + private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"'); + + private List getDefaultFields() { + final List fields = new ArrayList<>(); + for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return fields; + } + + @Test + public void testParserPerformance() throws IOException, MalformedRecordException { + // Generates about 130MB of data + final int NUM_LINES = 2500000; + StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n"); + for (int i = 0; i < NUM_LINES; i++) { + sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n"); + } + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes()); + final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + Record record; + int numRecords = 0; + while ((record = reader.nextRecord()) != null) { + assertNotNull(record); + numRecords++; + } + assertEquals(NUM_LINES, numRecords); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java new file mode 100644 index 0000000000..9e085949bf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TestJacksonCSVRecordReader { + private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType(); + private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"'); + + private List getDefaultFields() { + final List fields = new ArrayList<>(); + for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return fields; + } + + private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException { + return new JacksonCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII"); + } + + @Test + public void testUTF8() throws IOException, MalformedRecordException { + final String text = "name\n黃凱揚"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + final Record record = reader.nextRecord(); + final String name = (String)record.getValue("name"); + + assertEquals("黃凱揚", name); + } + } + + @Test + public void testDate() throws IOException, MalformedRecordException { + final String text = "date\n11/30/1983"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + final Record record = reader.nextRecord(); + final Date date = (Date) record.getValue("date"); + final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt")); + calendar.setTimeInMillis(date.getTime()); + + assertEquals(1983, calendar.get(Calendar.YEAR)); + assertEquals(10, calendar.get(Calendar.MONTH)); + assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH)); + } + } + + @Test + public void testSimpleParse() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv")); + final JacksonCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] record = reader.nextRecord().getValues(); + final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + Assert.assertArrayEquals(expectedValues, record); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testMultipleRecords() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv")); + final JacksonCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + Assert.assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + Assert.assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testExtraWhiteSpace() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv")); + final JacksonCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + Assert.assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + Assert.assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testMissingField() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals(40.8D, record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + + assertNull(reader.nextRecord()); + } + } + + @Test + public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, continent"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertNull(record.getValue("continent")); + + assertNull(reader.nextRecord()); + } + + // test nextRawRecord does contain 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertNull(record.getValue("country")); + assertEquals("North America", record.getValue("continent")); + + assertNull(reader.nextRecord(false, false)); + } + } + + + @Test + public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + + // If schema says that there are fields a, b, c + // and the CSV has a header line that says field names are a, b + // and then the data has values 1,2,3 + // then a=1, b=2, c=null + assertNull(record.getValue("country")); + + assertNull(reader.nextRecord()); + } + + // Create another Record Reader that indicates that the header line is present but should be ignored. This should cause + // our schema to be the definitive list of what fields exist. + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + final Record record = reader.nextRecord(); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + + // If schema says that there are fields a, b, c + // and the CSV has a header line that says field names are a, b + // and then the data has values 1,2,3 + // then a=1, b=2, c=null + // But if we configure the reader to Ignore the header, then this will not occur! + assertEquals("USA", record.getValue("country")); + + assertNull(reader.nextRecord()); + } + + } + + @Test + public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException { + final List fields = getDefaultFields(); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final String headerLine = "id, name, balance, address, city, state, zipCode, country"; + final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA, North America"; + final String csvData = headerLine + "\n" + inputRecord; + final byte[] inputData = csvData.getBytes(); + + // test nextRecord does not contain a 'continent' field + try (final InputStream bais = new ByteArrayInputStream(inputData); + final JacksonCSVRecordReader reader = createReader(bais, schema, format)) { + + final Record record = reader.nextRecord(false, false); + assertNotNull(record); + + assertEquals("1", record.getValue("id")); + assertEquals("John", record.getValue("name")); + assertEquals("40.80", record.getValue("balance")); + assertEquals("123 My Street", record.getValue("address")); + assertEquals("My City", record.getValue("city")); + assertEquals("MS", record.getValue("state")); + assertEquals("11111", record.getValue("zipCode")); + assertEquals("USA", record.getValue("country")); + assertEquals("North America", record.getValue("unknown_field_index_8")); + + assertNull(reader.nextRecord(false, false)); + } + } + + @Test + public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException { + + char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0); + + final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter); + final List fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv")); + final JacksonCSVRecordReader reader = createReader(fis, schema, format)) { + + final Object[] firstRecord = reader.nextRecord().getValues(); + final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; + Assert.assertArrayEquals(firstExpectedValues, firstRecord); + + final Object[] secondRecord = reader.nextRecord().getValues(); + final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"}; + Assert.assertArrayEquals(secondExpectedValues, secondRecord); + + assertNull(reader.nextRecord()); + } + } +}