NIFI-8622: Reuse of GenericRecord instance eliminated from AvroReaderWithExplicitSchema to avoid issues when parsing fields of type "bytes" (#5090)

This commit is contained in:
Peter Gyori 2021-06-09 21:22:22 +02:00 committed by GitHub
parent d44dec7345
commit 7cb3bce6a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 1 deletions

View File

@ -134,6 +134,8 @@
<excludes combine.children="append">
<exclude>src/test/resources/avro/avro_embed_schema.avro</exclude>
<exclude>src/test/resources/avro/avro_schemaless.avro</exclude>
<exclude>src/test/resources/avro/avro_schemaless_decimal.avro</exclude>
<exclude>src/test/resources/avro/avro_schemaless_decimal.avsc</exclude>
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
<exclude>src/test/resources/avro/decimals.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude>

View File

@ -89,7 +89,7 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader {
}
try {
genericRecord = datumReader.read(genericRecord, decoder);
genericRecord = datumReader.read(null, decoder);
} catch (final EOFException eof) {
return null;
}

View File

@ -18,13 +18,20 @@ package org.apache.nifi.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -96,4 +103,60 @@ public class TestAvroReaderWithExplicitSchema {
// Causes IOException in constructor due to schemas not matching
new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
}
@Test
public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception {
// GIVEN
String avroFilePath = "src/test/resources/avro/avro_schemaless_decimal.avro";
String avroSchemaPath = "src/test/resources/avro/avro_schemaless_decimal.avsc";
List<Map<String, String>> expectedRecords = new ArrayList<>(4);
Map<String, String> record = new HashMap<>();
record.put("id", "AAAAAA");
record.put("price", "0.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "BBBBBB");
record.put("price", "15000.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "CCCCCC");
record.put("price", "0.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "DDDDDD");
record.put("price", "12.340000");
expectedRecords.add(record);
// WHEN
AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema(avroFilePath, avroSchemaPath);
List<Record> actualRecords = collectRecords(avroReader);
// THEN
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); ++i) {
Map<String, String> expectedRecord = expectedRecords.get(i);
Record actualRecord = actualRecords.get(i);
assertEquals(expectedRecord.get("id"), actualRecord.getValue("id").toString());
assertEquals(expectedRecord.get("price"), actualRecord.getValue("price").toString());
}
}
private AvroReaderWithExplicitSchema createAvroReaderWithExplicitSchema(String avroFilePath, String avroSchemaPath) throws IOException {
FileInputStream avroFileInputStream = new FileInputStream(avroFilePath);
Schema avroSchema = new Schema.Parser().parse(new FileInputStream(avroSchemaPath));
RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
return new AvroReaderWithExplicitSchema(avroFileInputStream, recordSchema, avroSchema);
}
private List<Record> collectRecords(RecordReader reader) throws IOException, MalformedRecordException {
List<Record> records = new ArrayList<>();
Record record = reader.nextRecord();
while (record != null) {
records.add(record);
record = reader.nextRecord();
}
return records;
}
}

View File

@ -0,0 +1,22 @@
{
"type": "record",
"name": "DecimalTest",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "price",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 20,
"scale": 6
}
]
}
]
}