NIFI-4534 Choose Character Set for CSV Record Read/Write streams

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2229.
This commit is contained in:
patricker 2017-10-27 13:42:28 +08:00 committed by Pierre Villard
parent 856dedab12
commit e3482cc772
7 changed files with 53 additions and 20 deletions

View File

@ -117,6 +117,15 @@ public class CSVUtils {
.defaultValue("true") .defaultValue("true")
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("csvutils-character-set")
.displayName("Character Set")
.description("The Character Encoding that is used to encode/decode the CSV file")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.required(true)
.build();
// CSV Format fields for writers only // CSV Format fields for writers only
public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character."); public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character.");

View File

@ -60,7 +60,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
private volatile String timestampFormat; private volatile String timestampFormat;
private volatile boolean firstLineIsHeader; private volatile boolean firstLineIsHeader;
private volatile boolean ignoreHeader; private volatile boolean ignoreHeader;
private volatile String charSet;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -77,6 +77,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
properties.add(CSVUtils.COMMENT_MARKER); properties.add(CSVUtils.COMMENT_MARKER);
properties.add(CSVUtils.NULL_STRING); properties.add(CSVUtils.NULL_STRING);
properties.add(CSVUtils.TRIM_FIELDS); properties.add(CSVUtils.TRIM_FIELDS);
properties.add(CSVUtils.CHARSET);
return properties; return properties;
} }
@ -88,6 +89,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean(); this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean();
this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean(); this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean();
this.charSet = context.getProperty(CSVUtils.CHARSET).getValue();
// Ensure that if we are deriving schema from header that we always treat the first line as a header, // Ensure that if we are deriving schema from header that we always treat the first line as a header,
// regardless of the 'First Line is Header' property // regardless of the 'First Line is Header' property
@ -106,7 +108,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null); final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset(); bufferedIn.reset();
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat); return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
} }
@Override @Override

View File

@ -56,7 +56,7 @@ public class CSVRecordReader implements RecordReader {
private List<String> rawFieldNames; private List<String> rawFieldNames;
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
this.schema = schema; this.schema = schema;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
@ -67,7 +67,7 @@ public class CSVRecordReader implements RecordReader {
LAZY_TIME_FORMAT = () -> tf; LAZY_TIME_FORMAT = () -> tf;
LAZY_TIMESTAMP_FORMAT = () -> tsf; LAZY_TIMESTAMP_FORMAT = () -> tsf;
final Reader reader = new InputStreamReader(new BOMInputStream(in)); final Reader reader = new InputStreamReader(new BOMInputStream(in), encoding);
CSVFormat withHeader; CSVFormat withHeader;
if (hasHeader) { if (hasHeader) {

View File

@ -43,6 +43,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
private volatile CSVFormat csvFormat; private volatile CSVFormat csvFormat;
private volatile boolean includeHeader; private volatile boolean includeHeader;
private volatile String charSet;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -58,6 +59,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
properties.add(CSVUtils.QUOTE_MODE); properties.add(CSVUtils.QUOTE_MODE);
properties.add(CSVUtils.RECORD_SEPARATOR); properties.add(CSVUtils.RECORD_SEPARATOR);
properties.add(CSVUtils.TRAILING_DELIMITER); properties.add(CSVUtils.TRAILING_DELIMITER);
properties.add(CSVUtils.CHARSET);
return properties; return properties;
} }
@ -65,11 +67,12 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
public void storeCsvFormat(final ConfigurationContext context) { public void storeCsvFormat(final ConfigurationContext context) {
this.csvFormat = CSVUtils.createCSVFormat(context); this.csvFormat = CSVUtils.createCSVFormat(context);
this.includeHeader = context.getProperty(CSVUtils.INCLUDE_HEADER_LINE).asBoolean(); this.includeHeader = context.getProperty(CSVUtils.INCLUDE_HEADER_LINE).asBoolean();
this.charSet = context.getProperty(CSVUtils.CHARSET).getValue();
} }
@Override @Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader); getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
} }
} }

View File

@ -50,7 +50,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
private String[] fieldNames; private String[] fieldNames;
public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out, public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException { final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine, final String charSet) throws IOException {
super(out); super(out);
this.recordSchema = recordSchema; this.recordSchema = recordSchema;
@ -61,7 +61,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
this.includeHeaderLine = includeHeaderLine; this.includeHeaderLine = includeHeaderLine;
final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true); final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true);
final OutputStreamWriter streamWriter = new OutputStreamWriter(out); final OutputStreamWriter streamWriter = new OutputStreamWriter(out, charSet);
printer = new CSVPrinter(streamWriter, formatWithHeader); printer = new CSVPrinter(streamWriter, formatWithHeader);
fieldValues = new Object[recordSchema.getFieldCount()]; fieldValues = new Object[recordSchema.getFieldCount()];

View File

@ -60,7 +60,26 @@ public class TestCSVRecordReader {
private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException { private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException {
return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII");
}
@Test
public void testUTF8() throws IOException, MalformedRecordException {
final String text = "name\n黃凱揚";
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord();
final String name = (String)record.getValue("name");
assertEquals("黃凱揚", name);
}
} }
@Test @Test
@ -72,8 +91,8 @@ public class TestCSVRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields); final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord(); final Record record = reader.nextRecord();
final java.sql.Date date = (Date) record.getValue("date"); final java.sql.Date date = (Date) record.getValue("date");
@ -268,7 +287,7 @@ public class TestCSVRecordReader {
// our schema to be the definitive list of what fields exist. // our schema to be the definitive list of what fields exist.
try (final InputStream bais = new ByteArrayInputStream(inputData); try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
final Record record = reader.nextRecord(); final Record record = reader.nextRecord();
assertNotNull(record); assertNotNull(record);

View File

@ -76,10 +76,10 @@ public class TestWriteCSVResult {
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) {
final Map<String, Object> valueMap = new HashMap<>(); final Map<String, Object> valueMap = new HashMap<>();
valueMap.put("string", "string"); valueMap.put("string", "a孟bc李12儒3");
valueMap.put("boolean", true); valueMap.put("boolean", true);
valueMap.put("byte", (byte) 1); valueMap.put("byte", (byte) 1);
valueMap.put("char", 'c'); valueMap.put("char", 'c');
@ -113,7 +113,7 @@ public class TestWriteCSVResult {
final String values = splits[1]; final String values = splits[1];
final StringBuilder expectedBuilder = new StringBuilder(); final StringBuilder expectedBuilder = new StringBuilder();
expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); expectedBuilder.append("\"a孟bc李12儒3\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",");
final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
@ -143,7 +143,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.write(record); writer.write(record);
@ -170,7 +170,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.writeRawRecord(record); writer.writeRawRecord(record);
@ -197,7 +197,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.writeRecord(record); writer.writeRecord(record);
@ -224,7 +224,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.writeRawRecord(record); writer.writeRawRecord(record);
@ -253,7 +253,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.writeRecord(record); writer.writeRecord(record);
@ -281,7 +281,7 @@ public class TestWriteCSVResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output; final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
writer.beginRecordSet(); writer.beginRecordSet();
writer.writeRawRecord(record); writer.writeRawRecord(record);