NIFI-6419: Fixed AvroWriter single record with external schema results in data loss

This closes #3573.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Peter Turcsanyi 2019-07-04 18:48:36 +02:00 committed by Koji Kawamura
parent 4783b12a9c
commit 24e50953a3
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
5 changed files with 109 additions and 2 deletions

View File

@ -136,6 +136,7 @@
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/avro/simple.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>

View File

@ -26,6 +26,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@ -66,9 +67,77 @@ public abstract class TestWriteAvroResult {
protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException;
protected abstract List<GenericRecord> readRecords(InputStream in, Schema schema, int recordCount) throws IOException;
protected void verify(final WriteResult writeResult) {
}
@Test
public void testWriteRecord() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("msg", "nifi");
final Record record = new MapRecord(recordSchema, values);
try (final RecordSetWriter writer = createWriter(schema, baos)) {
writer.write(record);
}
final byte[] data = baos.toByteArray();
try (final InputStream in = new ByteArrayInputStream(data)) {
final GenericRecord avroRecord = readRecord(in, schema);
assertNotNull(avroRecord);
assertNotNull(avroRecord.get("msg"));
assertEquals("nifi", avroRecord.get("msg").toString());
}
}
@Test
public void testWriteRecordSet() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final int recordCount = 3;
List<Record> records = new ArrayList<>();
for (int i = 0; i < recordCount; i++){
final Map<String, Object> values = new HashMap<>();
values.put("msg", "nifi" + i);
final Record record = new MapRecord(recordSchema, values);
records.add(record);
}
try (final RecordSetWriter writer = createWriter(schema, baos)) {
writer.write(new ListRecordSet(recordSchema, records));
}
final byte[] data = baos.toByteArray();
try (final InputStream in = new ByteArrayInputStream(data)) {
final List<GenericRecord> avroRecords = readRecords(in, schema, recordCount);
for (int i = 0; i < recordCount; i++) {
final GenericRecord avroRecord = avroRecords.get(i);
assertNotNull(avroRecord);
assertNotNull(avroRecord.get("msg"));
assertEquals("nifi" + i, avroRecord.get("msg").toString());
}
}
}
@Test
public void testLogicalTypes() throws IOException, ParseException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));

View File

@ -20,6 +20,8 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@ -39,11 +41,25 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult {
@Override
protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
final Schema avroSchema = dataFileStream.getSchema();
GenericData.setStringType(avroSchema, StringType.String);
final GenericRecord avroRecord = dataFileStream.next();
return avroRecord;
}
@Override
protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
final Schema avroSchema = dataFileStream.getSchema();
GenericData.setStringType(avroSchema, StringType.String);
List<GenericRecord> records = new ArrayList<>();
for (int i = 0; i < recordCount; i++) {
records.add(dataFileStream.next());
}
return records;
}
}

View File

@ -65,10 +65,23 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
@Override
protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, decoder);
}
@Override
protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
List<GenericRecord> records = new ArrayList<>();
for (int i = 0; i < recordCount; i++) {
records.add(reader.read(null, decoder));
}
return records;
}
@Override
protected void verify(final WriteResult writeResult) {
final Map<String, String> attributes = writeResult.getAttributes();

View File

@ -0,0 +1,8 @@
{
"namespace": "nifi",
"name": "simple",
"type": "record",
"fields": [
{"name": "msg", "type": "string"}
]
}