mirror of https://github.com/apache/nifi.git
NIFI-4496: Added JacksonCSVRecordReader to allow choice of CSV parser. This closes #2245.
This commit is contained in:
parent
62e388aa4f
commit
14d2291db8
|
@ -62,6 +62,11 @@
|
|||
<artifactId>commons-csv</artifactId>
|
||||
<version>1.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-csv</artifactId>
|
||||
<version>2.9.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
|
|
|
@ -54,6 +54,26 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
|
|||
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
|
||||
+ "column names in the header and assuming that all columns are of type String.");
|
||||
|
||||
// CSV parsers
|
||||
public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV",
|
||||
"The CSV parser implementation from the Apache Commons CSV library.");
|
||||
|
||||
public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV",
|
||||
"The CSV parser implementation from the Jackson Dataformats library.");
|
||||
|
||||
|
||||
public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder()
|
||||
.name("csv-reader-csv-parser")
|
||||
.displayName("CSV Parser")
|
||||
.description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality "
|
||||
+ "and may also exhibit different levels of performance.")
|
||||
.expressionLanguageSupported(false)
|
||||
.allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
|
||||
.defaultValue(APACHE_COMMONS_CSV.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private volatile String csvParser;
|
||||
private volatile CSVFormat csvFormat;
|
||||
private volatile String dateFormat;
|
||||
private volatile String timeFormat;
|
||||
|
@ -65,6 +85,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
|
|||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
properties.add(CSV_PARSER);
|
||||
properties.add(DateTimeUtils.DATE_FORMAT);
|
||||
properties.add(DateTimeUtils.TIME_FORMAT);
|
||||
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
|
||||
|
@ -83,6 +104,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
|
|||
|
||||
@OnEnabled
|
||||
public void storeCsvFormat(final ConfigurationContext context) {
|
||||
this.csvParser = context.getProperty(CSV_PARSER).getValue();
|
||||
this.csvFormat = CSVUtils.createCSVFormat(context);
|
||||
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
|
||||
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
|
||||
|
@ -108,7 +130,13 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
|
|||
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null);
|
||||
bufferedIn.reset();
|
||||
|
||||
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
|
||||
if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
|
||||
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
|
||||
} else if(JACKSON_CSV.getValue().equals(csvParser)) {
|
||||
return new JacksonCSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
|
||||
} else {
|
||||
throw new IOException("Parser not supported");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.csv;
|
||||
|
||||
import com.fasterxml.jackson.databind.MappingIterator;
|
||||
import com.fasterxml.jackson.databind.ObjectReader;
|
||||
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
|
||||
import com.fasterxml.jackson.dataformat.csv.CsvParser;
|
||||
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.io.input.BOMInputStream;
|
||||
import org.apache.commons.lang3.CharUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.text.DateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
||||
public class JacksonCSVRecordReader implements RecordReader {
|
||||
private final RecordSchema schema;
|
||||
|
||||
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||
|
||||
private final ComponentLog logger;
|
||||
private final boolean hasHeader;
|
||||
private final boolean ignoreHeader;
|
||||
private final MappingIterator<String[]> recordStream;
|
||||
private List<String> rawFieldNames = null;
|
||||
|
||||
private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
|
||||
|
||||
public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
|
||||
|
||||
this.schema = schema;
|
||||
this.logger = logger;
|
||||
this.hasHeader = hasHeader;
|
||||
this.ignoreHeader = ignoreHeader;
|
||||
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||
|
||||
LAZY_DATE_FORMAT = () -> df;
|
||||
LAZY_TIME_FORMAT = () -> tf;
|
||||
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||
|
||||
final Reader reader = new InputStreamReader(new BOMInputStream(in));
|
||||
|
||||
CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
|
||||
.setColumnSeparator(csvFormat.getDelimiter())
|
||||
.setLineSeparator(csvFormat.getRecordSeparator())
|
||||
// Can only use comments in Jackson CSV if the correct marker is set
|
||||
.setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker())))
|
||||
// The call to setUseHeader(false) in all code paths is due to the way Jackson does data binding/mapping. Missing or extra columns may not
|
||||
// be handled correctly when using the header for mapping.
|
||||
.setUseHeader(false);
|
||||
|
||||
csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter());
|
||||
csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter());
|
||||
|
||||
if (hasHeader) {
|
||||
if (ignoreHeader) {
|
||||
csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true);
|
||||
}
|
||||
}
|
||||
|
||||
CsvSchema csvSchema = csvSchemaBuilder.build();
|
||||
|
||||
// Add remaining config options to the mapper
|
||||
List<CsvParser.Feature> features = new ArrayList<>(3);
|
||||
features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS);
|
||||
if (csvFormat.getIgnoreEmptyLines()) {
|
||||
features.add(CsvParser.Feature.SKIP_EMPTY_LINES);
|
||||
}
|
||||
if (csvFormat.getTrim()) {
|
||||
features.add(CsvParser.Feature.TRIM_SPACES);
|
||||
}
|
||||
|
||||
ObjectReader objReader = mapper.readerFor(String[].class)
|
||||
.with(csvSchema)
|
||||
.withFeatures(features.toArray(new CsvParser.Feature[3]));
|
||||
|
||||
recordStream = objReader.readValues(reader);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
|
||||
final RecordSchema schema = getSchema();
|
||||
|
||||
if (recordStream.hasNext()) {
|
||||
String[] csvRecord = recordStream.next();
|
||||
|
||||
// If the first record is the header names (and we're using them), store those off for use in creating the value map on the next iterations
|
||||
if (rawFieldNames == null) {
|
||||
if (!hasHeader || ignoreHeader) {
|
||||
rawFieldNames = schema.getFieldNames();
|
||||
} else {
|
||||
rawFieldNames = Arrays.asList(csvRecord);
|
||||
|
||||
// Advance the stream to keep the record count correct
|
||||
if (recordStream.hasNext()) {
|
||||
csvRecord = recordStream.next();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check for empty lines and ignore them
|
||||
boolean foundRecord = true;
|
||||
if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) {
|
||||
foundRecord = false;
|
||||
while (recordStream.hasNext()) {
|
||||
csvRecord = recordStream.next();
|
||||
|
||||
if (csvRecord != null && !(csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) {
|
||||
// This is a non-empty record/row, so continue processing
|
||||
foundRecord = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return
|
||||
if (!foundRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> values = new LinkedHashMap<>();
|
||||
final int numFieldNames = rawFieldNames.size();
|
||||
for (int i = 0; i < csvRecord.length; i++) {
|
||||
final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);
|
||||
String rawValue = (i >= csvRecord.length) ? null : csvRecord[i];
|
||||
|
||||
final Optional<DataType> dataTypeOption = schema.getDataType(rawFieldName);
|
||||
|
||||
if (!dataTypeOption.isPresent() && dropUnknownFields) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final Object value;
|
||||
if (coerceTypes && dataTypeOption.isPresent()) {
|
||||
value = convert(rawValue, dataTypeOption.get(), rawFieldName);
|
||||
} else if (dataTypeOption.isPresent()) {
|
||||
// The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
|
||||
// dictate a field type. As a result, we will use the schema that we have to attempt to convert
|
||||
// the value into the desired type if it's a simple type.
|
||||
value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName);
|
||||
} else {
|
||||
value = rawValue;
|
||||
}
|
||||
|
||||
values.put(rawFieldName, value);
|
||||
}
|
||||
|
||||
return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
protected Object convert(final String value, final DataType dataType, final String fieldName) {
|
||||
if (dataType == null || value == null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
|
||||
if (trimmed.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||
}
|
||||
|
||||
private Object convertSimpleIfPossible(final String value, final DataType dataType, final String fieldName) {
|
||||
if (dataType == null || value == null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
|
||||
if (trimmed.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (dataType.getFieldType()) {
|
||||
case STRING:
|
||||
return value;
|
||||
case BOOLEAN:
|
||||
case INT:
|
||||
case LONG:
|
||||
case FLOAT:
|
||||
case DOUBLE:
|
||||
case BYTE:
|
||||
case CHAR:
|
||||
case SHORT:
|
||||
case TIME:
|
||||
case TIMESTAMP:
|
||||
case DATE:
|
||||
if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
|
||||
return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
recordStream.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.csv;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class ITApacheCSVRecordReader {
|
||||
|
||||
private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
|
||||
|
||||
private List<RecordField> getDefaultFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
|
||||
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
|
||||
}
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParserPerformance() throws IOException, MalformedRecordException {
|
||||
// Generates about 130MB of data
|
||||
final int NUM_LINES = 2500000;
|
||||
StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
|
||||
for (int i = 0; i < NUM_LINES; i++) {
|
||||
sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
|
||||
}
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
|
||||
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
|
||||
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
|
||||
|
||||
Record record;
|
||||
int numRecords = 0;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
assertNotNull(record);
|
||||
numRecords++;
|
||||
}
|
||||
assertEquals(NUM_LINES, numRecords);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.csv;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class ITJacksonCSVRecordReader {
|
||||
|
||||
private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
|
||||
|
||||
private List<RecordField> getDefaultFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
|
||||
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
|
||||
}
|
||||
return fields;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParserPerformance() throws IOException, MalformedRecordException {
|
||||
// Generates about 130MB of data
|
||||
final int NUM_LINES = 2500000;
|
||||
StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
|
||||
for (int i = 0; i < NUM_LINES; i++) {
|
||||
sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
|
||||
}
|
||||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
|
||||
final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
|
||||
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
|
||||
|
||||
Record record;
|
||||
int numRecords = 0;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
assertNotNull(record);
|
||||
numRecords++;
|
||||
}
|
||||
assertEquals(NUM_LINES, numRecords);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,371 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.csv;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.lang3.StringEscapeUtils;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.sql.Date;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.List;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
public class TestJacksonCSVRecordReader {
|
||||
private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
|
||||
private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
|
||||
|
||||
private List<RecordField> getDefaultFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
|
||||
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
|
||||
}
|
||||
return fields;
|
||||
}
|
||||
|
||||
private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException {
|
||||
return new JacksonCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
|
||||
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUTF8() throws IOException, MalformedRecordException {
|
||||
final String text = "name\n黃凱揚";
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
|
||||
final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
|
||||
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
final String name = (String)record.getValue("name");
|
||||
|
||||
assertEquals("黃凱揚", name);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDate() throws IOException, MalformedRecordException {
|
||||
final String text = "date\n11/30/1983";
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
|
||||
final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
|
||||
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
final Date date = (Date) record.getValue("date");
|
||||
final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
|
||||
calendar.setTimeInMillis(date.getTime());
|
||||
|
||||
assertEquals(1983, calendar.get(Calendar.YEAR));
|
||||
assertEquals(10, calendar.get(Calendar.MONTH));
|
||||
assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleParse() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
|
||||
final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
|
||||
|
||||
final Object[] record = reader.nextRecord().getValues();
|
||||
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
|
||||
Assert.assertArrayEquals(expectedValues, record);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRecords() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
|
||||
final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
|
||||
|
||||
final Object[] firstRecord = reader.nextRecord().getValues();
|
||||
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
|
||||
Assert.assertArrayEquals(firstExpectedValues, firstRecord);
|
||||
|
||||
final Object[] secondRecord = reader.nextRecord().getValues();
|
||||
final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
|
||||
Assert.assertArrayEquals(secondExpectedValues, secondRecord);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
|
||||
final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
|
||||
|
||||
final Object[] firstRecord = reader.nextRecord().getValues();
|
||||
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
|
||||
Assert.assertArrayEquals(firstExpectedValues, firstRecord);
|
||||
|
||||
final Object[] secondRecord = reader.nextRecord().getValues();
|
||||
final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
|
||||
Assert.assertArrayEquals(secondExpectedValues, secondRecord);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingField() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
|
||||
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111";
|
||||
final String csvData = headerLine + "\n" + inputRecord;
|
||||
final byte[] inputData = csvData.getBytes();
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals(40.8D, record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
assertNull(record.getValue("country"));
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final String headerLine = "id, name, balance, address, city, state, zipCode, continent";
|
||||
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North America";
|
||||
final String csvData = headerLine + "\n" + inputRecord;
|
||||
final byte[] inputData = csvData.getBytes();
|
||||
|
||||
// test nextRecord does not contain a 'continent' field
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals("40.80", record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
assertNull(record.getValue("country"));
|
||||
assertNull(record.getValue("continent"));
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
|
||||
// test nextRawRecord does contain 'continent' field
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
|
||||
|
||||
final Record record = reader.nextRecord(false, false);
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals("40.80", record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
assertNull(record.getValue("country"));
|
||||
assertEquals("North America", record.getValue("continent"));
|
||||
|
||||
assertNull(reader.nextRecord(false, false));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final String headerLine = "id, name, balance, address, city, state, zipCode";
|
||||
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA";
|
||||
final String csvData = headerLine + "\n" + inputRecord;
|
||||
final byte[] inputData = csvData.getBytes();
|
||||
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals("40.80", record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
|
||||
// If schema says that there are fields a, b, c
|
||||
// and the CSV has a header line that says field names are a, b
|
||||
// and then the data has values 1,2,3
|
||||
// then a=1, b=2, c=null
|
||||
assertNull(record.getValue("country"));
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
|
||||
// Create another Record Reader that indicates that the header line is present but should be ignored. This should cause
|
||||
// our schema to be the definitive list of what fields exist.
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true,
|
||||
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
|
||||
|
||||
final Record record = reader.nextRecord();
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals("40.80", record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
|
||||
// If schema says that there are fields a, b, c
|
||||
// and the CSV has a header line that says field names are a, b
|
||||
// and then the data has values 1,2,3
|
||||
// then a=1, b=2, c=null
|
||||
// But if we configure the reader to Ignore the header, then this will not occur!
|
||||
assertEquals("USA", record.getValue("country"));
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
|
||||
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA, North America";
|
||||
final String csvData = headerLine + "\n" + inputRecord;
|
||||
final byte[] inputData = csvData.getBytes();
|
||||
|
||||
// test nextRecord does not contain a 'continent' field
|
||||
try (final InputStream bais = new ByteArrayInputStream(inputData);
|
||||
final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
|
||||
|
||||
final Record record = reader.nextRecord(false, false);
|
||||
assertNotNull(record);
|
||||
|
||||
assertEquals("1", record.getValue("id"));
|
||||
assertEquals("John", record.getValue("name"));
|
||||
assertEquals("40.80", record.getValue("balance"));
|
||||
assertEquals("123 My Street", record.getValue("address"));
|
||||
assertEquals("My City", record.getValue("city"));
|
||||
assertEquals("MS", record.getValue("state"));
|
||||
assertEquals("11111", record.getValue("zipCode"));
|
||||
assertEquals("USA", record.getValue("country"));
|
||||
assertEquals("North America", record.getValue("unknown_field_index_8"));
|
||||
|
||||
assertNull(reader.nextRecord(false, false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException {
|
||||
|
||||
char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
|
||||
|
||||
final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
|
||||
final List<RecordField> fields = getDefaultFields();
|
||||
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
|
||||
final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
|
||||
|
||||
final Object[] firstRecord = reader.nextRecord().getValues();
|
||||
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
|
||||
Assert.assertArrayEquals(firstExpectedValues, firstRecord);
|
||||
|
||||
final Object[] secondRecord = reader.nextRecord().getValues();
|
||||
final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
|
||||
Assert.assertArrayEquals(secondExpectedValues, secondRecord);
|
||||
|
||||
assertNull(reader.nextRecord());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue