mirror of https://github.com/apache/nifi.git
NIFI-6753: Fixed bug where all values being provided to the CSV Writer were String objects, which resulted in the CSV Writer improperly quoting numeric values when the schema indicates that the value is a number. Now, we will only convert the value to a String if the value is not a Number and/or the schema does not indicate a numeric ty type
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3797
This commit is contained in:
parent
2d90145842
commit
e394f6683a
|
@ -17,14 +17,6 @@
|
||||||
|
|
||||||
package org.apache.nifi.csv;
|
package org.apache.nifi.csv;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.csv.CSVFormat;
|
import org.apache.commons.csv.CSVFormat;
|
||||||
import org.apache.commons.csv.CSVPrinter;
|
import org.apache.commons.csv.CSVPrinter;
|
||||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||||
|
@ -35,8 +27,17 @@ import org.apache.nifi.serialization.record.DataType;
|
||||||
import org.apache.nifi.serialization.record.RawRecordWriter;
|
import org.apache.nifi.serialization.record.RawRecordWriter;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordField;
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
|
public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
|
||||||
private final RecordSchema recordSchema;
|
private final RecordSchema recordSchema;
|
||||||
private final SchemaAccessWriter schemaWriter;
|
private final SchemaAccessWriter schemaWriter;
|
||||||
|
@ -142,13 +143,34 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (final RecordField recordField : recordSchema.getFields()) {
|
for (final RecordField recordField : recordSchema.getFields()) {
|
||||||
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
|
fieldValues[i++] = getFieldValue(record, recordField);
|
||||||
}
|
}
|
||||||
|
|
||||||
printer.printRecord(fieldValues);
|
printer.printRecord(fieldValues);
|
||||||
return schemaWriter.getAttributes(recordSchema);
|
return schemaWriter.getAttributes(recordSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Object getFieldValue(final Record record, final RecordField recordField) {
|
||||||
|
final RecordFieldType fieldType = recordField.getDataType().getFieldType();
|
||||||
|
|
||||||
|
switch (fieldType) {
|
||||||
|
case BIGINT:
|
||||||
|
case BYTE:
|
||||||
|
case DOUBLE:
|
||||||
|
case FLOAT:
|
||||||
|
case LONG:
|
||||||
|
case INT:
|
||||||
|
case SHORT:
|
||||||
|
final Object value = record.getValue(recordField);
|
||||||
|
if (value instanceof Number) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.getAsString(recordField, getFormat(recordField));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WriteResult writeRawRecord(final Record record) throws IOException {
|
public WriteResult writeRawRecord(final Record record) throws IOException {
|
||||||
// If we are not writing an active record set, then we need to ensure that we write the
|
// If we are not writing an active record set, then we need to ensure that we write the
|
||||||
|
|
|
@ -51,6 +51,43 @@ import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestWriteCSVResult {
|
public class TestWriteCSVResult {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumbersNotQuoted() throws IOException {
|
||||||
|
final Map<String, Object> values = new HashMap<>();
|
||||||
|
values.put("name", "John Doe");
|
||||||
|
values.put("age", 30);
|
||||||
|
|
||||||
|
final List<RecordField> schemaFields = new ArrayList<>();
|
||||||
|
schemaFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
|
||||||
|
schemaFields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
|
||||||
|
|
||||||
|
final RecordSchema schema = new SimpleRecordSchema(schemaFields);
|
||||||
|
final Record record = new MapRecord(schema, values);
|
||||||
|
|
||||||
|
// Test with Non-Numeric Quote Mode
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.NON_NUMERIC).withRecordSeparator("\n");
|
||||||
|
try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
|
||||||
|
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) {
|
||||||
|
result.writeRecord(record);
|
||||||
|
}
|
||||||
|
|
||||||
|
String output = baos.toString();
|
||||||
|
assertEquals("\"name\",\"age\"\n\"John Doe\",30\n", output);
|
||||||
|
|
||||||
|
baos.reset();
|
||||||
|
|
||||||
|
// Test with MINIMAL Quote Mode
|
||||||
|
csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.MINIMAL).withRecordSeparator("\n");
|
||||||
|
try (final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
|
||||||
|
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) {
|
||||||
|
result.writeRecord(record);
|
||||||
|
}
|
||||||
|
|
||||||
|
output = baos.toString();
|
||||||
|
assertEquals("name,age\nJohn Doe,30\n", output);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDataTypes() throws IOException {
|
public void testDataTypes() throws IOException {
|
||||||
final CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL).withRecordSeparator("\n");
|
final CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL).withRecordSeparator("\n");
|
||||||
|
|
Loading…
Reference in New Issue