From e3482cc772c9e6324382aeb09a09f7a93898ece1 Mon Sep 17 00:00:00 2001 From: patricker Date: Fri, 27 Oct 2017 13:42:28 +0800 Subject: [PATCH] NIFI-4534 Choose Character Set for CSV Record Read/Write streams Signed-off-by: Pierre Villard This closes #2229. --- .../java/org/apache/nifi/csv/CSVUtils.java | 9 +++++++ .../java/org/apache/nifi/csv/CSVReader.java | 6 +++-- .../org/apache/nifi/csv/CSVRecordReader.java | 4 +-- .../apache/nifi/csv/CSVRecordSetWriter.java | 5 +++- .../org/apache/nifi/csv/WriteCSVResult.java | 4 +-- .../apache/nifi/csv/TestCSVRecordReader.java | 27 ++++++++++++++++--- .../apache/nifi/csv/TestWriteCSVResult.java | 18 ++++++------- 7 files changed, 53 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java index bc074b329b..eecf29037b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -117,6 +117,15 @@ public class CSVUtils { .defaultValue("true") .required(true) .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("csvutils-character-set") + .displayName("Character Set") + .description("The Character Encoding that is used to encode/decode the CSV file") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(true) + .build(); // CSV Format fields for writers only public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character."); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index f15f85d60a..a9b98f5721 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -60,7 +60,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact private volatile String timestampFormat; private volatile boolean firstLineIsHeader; private volatile boolean ignoreHeader; - + private volatile String charSet; @Override protected List getSupportedPropertyDescriptors() { @@ -77,6 +77,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact properties.add(CSVUtils.COMMENT_MARKER); properties.add(CSVUtils.NULL_STRING); properties.add(CSVUtils.TRIM_FIELDS); + properties.add(CSVUtils.CHARSET); return properties; } @@ -88,6 +89,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean(); this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean(); + this.charSet = context.getProperty(CSVUtils.CHARSET).getValue(); // Ensure that if we are deriving schema from header that we always treat the first line as a header, // regardless of the 'First Line is Header' property @@ -106,7 +108,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null); bufferedIn.reset(); - return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat); + return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index 18bea6b88b..70aaba9e51 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -56,7 +56,7 @@ public class CSVRecordReader implements RecordReader { private List rawFieldNames; public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, - final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { this.schema = schema; final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); @@ -67,7 +67,7 @@ public class CSVRecordReader implements RecordReader { LAZY_TIME_FORMAT = () -> tf; LAZY_TIMESTAMP_FORMAT = () -> tsf; - final Reader reader = new InputStreamReader(new BOMInputStream(in)); + final Reader reader = new InputStreamReader(new BOMInputStream(in), encoding); CSVFormat withHeader; if (hasHeader) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index bd2e60042f..7aab5a36e6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -43,6 +43,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R private volatile CSVFormat csvFormat; private volatile boolean includeHeader; + private volatile String charSet; @Override protected List getSupportedPropertyDescriptors() { @@ -58,6 +59,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R properties.add(CSVUtils.QUOTE_MODE); properties.add(CSVUtils.RECORD_SEPARATOR); properties.add(CSVUtils.TRAILING_DELIMITER); + properties.add(CSVUtils.CHARSET); return properties; } @@ -65,11 +67,12 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R public void storeCsvFormat(final ConfigurationContext context) { this.csvFormat = CSVUtils.createCSVFormat(context); this.includeHeader = context.getProperty(CSVUtils.INCLUDE_HEADER_LINE).asBoolean(); + this.charSet = context.getProperty(CSVUtils.CHARSET).getValue(); } @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, - getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader); + getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index 82d687a42a..849b1ca682 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -50,7 +50,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet private String[] fieldNames; public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out, - final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException { + final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine, final String charSet) throws IOException { super(out); this.recordSchema = recordSchema; @@ -61,7 +61,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet this.includeHeaderLine = includeHeaderLine; final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true); - final OutputStreamWriter streamWriter = new OutputStreamWriter(out); + final OutputStreamWriter streamWriter = new OutputStreamWriter(out, charSet); printer = new CSVPrinter(streamWriter, formatWithHeader); fieldValues = new Object[recordSchema.getFieldCount()]; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java index 576132fb9f..23eb95bca3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java @@ -60,7 +60,26 @@ public class TestCSVRecordReader { private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException { return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII"); + } + + @Test + public void testUTF8() throws IOException, MalformedRecordException { + final String text = "name\n黃凱揚"; + + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + final Record record = reader.nextRecord(); + final String name = (String)record.getValue("name"); + + assertEquals("黃凱揚", name); + } } @Test @@ -72,8 +91,8 @@ public class TestCSVRecordReader { final RecordSchema schema = new SimpleRecordSchema(fields); try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); - final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, - "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false, + "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { final Record record = reader.nextRecord(); final java.sql.Date date = (Date) record.getValue("date"); @@ -268,7 +287,7 @@ public class TestCSVRecordReader { // our schema to be the definitive list of what fields exist. try (final InputStream bais = new ByteArrayInputStream(inputData); final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { final Record record = reader.nextRecord(); assertNotNull(record); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index 0285796d61..6f4269a923 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -76,10 +76,10 @@ public class TestWriteCSVResult { final long now = System.currentTimeMillis(); try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) { final Map valueMap = new HashMap<>(); - valueMap.put("string", "string"); + valueMap.put("string", "a孟bc李12儒3"); valueMap.put("boolean", true); valueMap.put("byte", (byte) 1); valueMap.put("char", 'c'); @@ -113,7 +113,7 @@ public class TestWriteCSVResult { final String values = splits[1]; final StringBuilder expectedBuilder = new StringBuilder(); - expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); + expectedBuilder.append("\"a孟bc李12儒3\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); @@ -143,7 +143,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.write(record); @@ -170,7 +170,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record); @@ -197,7 +197,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRecord(record); @@ -224,7 +224,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record); @@ -253,7 +253,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRecord(record); @@ -281,7 +281,7 @@ public class TestWriteCSVResult { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String output; try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, - RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) { + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { writer.beginRecordSet(); writer.writeRawRecord(record);