From 7cb3bce6a7bfb9290f4311d35048b710ebe8e989 Mon Sep 17 00:00:00 2001 From: Peter Gyori <63872658+pgyori@users.noreply.github.com> Date: Wed, 9 Jun 2021 21:22:22 +0200 Subject: [PATCH] NIFI-8622: Reuse of GenericRecord instance eliminated from AvroReaderWithExplicitSchema to avoid issues when parsing fields of type "bytes" (#5090) --- .../pom.xml | 2 + .../avro/AvroReaderWithExplicitSchema.java | 2 +- .../TestAvroReaderWithExplicitSchema.java | 63 ++++++++++++++++++ .../avro/avro_schemaless_decimal.avro | Bin 0 -> 47 bytes .../avro/avro_schemaless_decimal.avsc | 22 ++++++ 5 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avro create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avsc diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 31018ac5e4..e36bcaee47 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -134,6 +134,8 @@ src/test/resources/avro/avro_embed_schema.avro src/test/resources/avro/avro_schemaless.avro + src/test/resources/avro/avro_schemaless_decimal.avro + src/test/resources/avro/avro_schemaless_decimal.avsc src/test/resources/avro/datatypes.avsc src/test/resources/avro/decimals.avsc src/test/resources/avro/logical-types.avsc diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index 9c0dbf5e1b..ab20aad811 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -89,7 +89,7 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { } try { - genericRecord = datumReader.read(genericRecord, decoder); + genericRecord = datumReader.read(null, decoder); } catch (final EOFException eof) { return null; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 5d3cd00fba..72d1e046cd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -18,13 +18,20 @@ package org.apache.nifi.avro; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -96,4 +103,60 @@ public class TestAvroReaderWithExplicitSchema { // Causes IOException in constructor due to schemas not matching new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema); } + + @Test + public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception { + // GIVEN + String avroFilePath = "src/test/resources/avro/avro_schemaless_decimal.avro"; + String avroSchemaPath = "src/test/resources/avro/avro_schemaless_decimal.avsc"; + + List> expectedRecords = new ArrayList<>(4); + Map record = new HashMap<>(); + record.put("id", "AAAAAA"); + record.put("price", "0.000000"); + expectedRecords.add(record); + record = new HashMap<>(); + record.put("id", "BBBBBB"); + record.put("price", "15000.000000"); + expectedRecords.add(record); + record = new HashMap<>(); + record.put("id", "CCCCCC"); + record.put("price", "0.000000"); + expectedRecords.add(record); + record = new HashMap<>(); + record.put("id", "DDDDDD"); + record.put("price", "12.340000"); + expectedRecords.add(record); + + // WHEN + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema(avroFilePath, avroSchemaPath); + List actualRecords = collectRecords(avroReader); + + // THEN + assertEquals(expectedRecords.size(), actualRecords.size()); + for (int i = 0; i < expectedRecords.size(); ++i) { + Map expectedRecord = expectedRecords.get(i); + Record actualRecord = actualRecords.get(i); + + assertEquals(expectedRecord.get("id"), actualRecord.getValue("id").toString()); + assertEquals(expectedRecord.get("price"), actualRecord.getValue("price").toString()); + } + } + + private AvroReaderWithExplicitSchema createAvroReaderWithExplicitSchema(String avroFilePath, String avroSchemaPath) throws IOException { + FileInputStream avroFileInputStream = new FileInputStream(avroFilePath); + Schema avroSchema = new Schema.Parser().parse(new FileInputStream(avroSchemaPath)); + RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + return new AvroReaderWithExplicitSchema(avroFileInputStream, recordSchema, avroSchema); + } + + private List collectRecords(RecordReader reader) throws IOException, MalformedRecordException { + List records = new ArrayList<>(); + Record record = reader.nextRecord(); + while (record != null) { + records.add(record); + record = reader.nextRecord(); + } + return records; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avro new file mode 100644 index 0000000000000000000000000000000000000000..e3b7ae18359ae30a7f1a4acf31de7f6ee95f9488 GIT binary patch literal 47 mcmd;a1Op}}1|BCcVB%t~6THU2;|vBMaThRP;$Yb0tpEViKM5KD literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avsc new file mode 100644 index 0000000000..ed6cbce8d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless_decimal.avsc @@ -0,0 +1,22 @@ +{ + "type": "record", + "name": "DecimalTest", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "price", + "type": [ + "null", + { + "type": "bytes", + "logicalType": "decimal", + "precision": 20, + "scale": 6 + } + ] + } + ] +}