diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java index bded6fa539..58d5d6be54 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; import java.text.NumberFormat; +import java.text.ParseException; import java.util.ArrayList; import java.util.List; @@ -157,7 +158,9 @@ class CSVUtils { NumberFormat numberFormat = DecimalFormat.getInstance(); numberFormat.setGroupingUsed(false); normalizeNumberFormat(numberFormat, scale, precision); - final String rawValue = new String(((ByteBuffer)fieldValue).array()); + String rawValue = new String(((ByteBuffer)fieldValue).array()); + // raw value needs to be parsed to ensure that BigDecimal will not throw an exception for specific locale + rawValue = numberFormat.parse(rawValue).toString(); out.write(numberFormat.format(new BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8)); } else { out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8)); @@ -167,7 +170,7 @@ class CSVUtils { delimiterToUse = String.valueOf(delimiter); } } - } catch (IOException e) { + } catch (IOException | ParseException e) { throw new IllegalStateException("Failed to parse AVRO Record", e); } } diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java index dabbc17a3b..e1de37261f 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java @@ -22,9 +22,12 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; +import java.text.DecimalFormat; +import java.text.NumberFormat; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -122,46 +125,21 @@ public class TransformersTest { "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt", "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt", "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt", - "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt", - "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt", - "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt", "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"}) public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception { - final String data = getResourceAsString(inputCSVFileName); final String schemaText = getResourceAsString(inputAvroSchema); final String result = getResourceAsString(expectedOuput); - - Schema schema = new Schema.Parser().parse(schemaText); - - - // CSV -> AVRO -> CSV - - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - AvroUtils.write(record, out); - byte[] avro = out.toByteArray(); - - in = new ByteArrayInputStream(avro); - record = AvroUtils.read(in, schema); - out = new ByteArrayOutputStream(); - CSVUtils.write(record, '|', out); - byte[] csv = out.toByteArray(); - assertEquals(result, new String(csv, StandardCharsets.UTF_8)); - + csvRoundTrip(data, schemaText, result); } @Test @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt", "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"}) public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) { - try { final String data = getResourceAsString(inputCSVFileName); - final String schemaText = getResourceAsString(inputAvroSchema); - Schema schema = new Schema.Parser().parse(schemaText); ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); @@ -171,7 +149,53 @@ public class TransformersTest { }catch(IllegalArgumentException iae){ assertTrue(true); } + } + @Test + public void testCSVRoundTrip() throws IOException { + NumberFormat numberFormat = DecimalFormat.getInstance(); + numberFormat.setGroupingUsed(false); + ((DecimalFormat) numberFormat).setParseBigDecimal(true); + + //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt", + String decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.89)); + String data = getResourceAsString("input_csv/decimal_logicalType.txt"); + String schemaText = getResourceAsString("input_avro/decimal_logicalType_invalid_scale_with_default.txt"); + csvRoundTrip(data, schemaText, decimalLogicalType); + + // needs to be set now because scale < precision + numberFormat.setMaximumIntegerDigits(10); + numberFormat.setMaximumFractionDigits(3); + numberFormat.setMinimumFractionDigits(3); + + //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt", + decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.890)); + data = getResourceAsString("input_csv/decimal_logicalType.txt"); + schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_no_default.txt"); + + //"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt", + decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(0.000)); + data = getResourceAsString("input_csv/decimal_logicalType_missing_value.txt"); + schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_default.txt"); + csvRoundTrip(data, schemaText, decimalLogicalType); + } + + private void csvRoundTrip(final String data, final String schemaText, final String result) { + Schema schema = new Schema.Parser().parse(schemaText); + + // CSV -> AVRO -> CSV + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(result, new String(csv, StandardCharsets.UTF_8)); } /**