NIFI-7843 Recursive avro schemas fail to write with RecordWriter

NIFI-7843 Recursive avro schemas fail to write with RecordWriter
Add new test case to TestSimpleRecordSchema to test the scenario
when schema name and schema namespace match.

This closes #4550.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Denes Arvay 2020-09-23 11:58:52 +02:00 committed by Peter Turcsanyi
parent 9c83908c9c
commit f73a019f36
7 changed files with 59 additions and 0 deletions

View File

@ -167,6 +167,11 @@ public class SimpleRecordSchema implements RecordSchema {
}
final RecordSchema other = (RecordSchema) obj;
if (getSchemaNamespace().isPresent() && getSchemaNamespace().equals(other.getSchemaNamespace())
&& getSchemaName().isPresent() && getSchemaName().equals(other.getSchemaName())) {
return true;
}
return fields.equals(other.getFields());
}

View File

@ -22,8 +22,10 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -90,6 +92,24 @@ public class TestSimpleRecordSchema {
assertTrue(secondSchema.equals(schema));
}
@Test
public void testFieldsArentCheckedInEqualsIfNameAndNamespaceMatch() {
final RecordField testField = new RecordField("test", RecordFieldType.STRING.getDataType());
final SimpleRecordSchema schema1 = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
schema1.setSchemaName("name");
schema1.setSchemaNamespace("namespace");
schema1.setFields(Collections.singletonList(testField));
SimpleRecordSchema schema2 = Mockito.spy(new SimpleRecordSchema(SchemaIdentifier.EMPTY));
schema2.setSchemaName("name");
schema2.setSchemaNamespace("namespace");
schema2.setFields(Collections.singletonList(testField));
assertTrue(schema1.equals(schema2));
Mockito.verify(schema2, Mockito.never()).getFields();
}
private Set<String> set(final String... values) {
final Set<String> set = new HashSet<>();
for (final String value : values) {

View File

@ -378,6 +378,8 @@ public class AvroTypeUtil {
return knownRecordTypes.get(schemaFullName);
} else {
SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
recordSchema.setSchemaName(avroSchema.getName());
recordSchema.setSchemaNamespace(avroSchema.getNamespace());
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
knownRecordTypes.put(schemaFullName, recordSchemaType);

View File

@ -140,6 +140,7 @@
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/avro/simple.avsc</exclude>
<exclude>src/test/resources/avro/recursive.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>

View File

@ -39,6 +39,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -73,6 +74,26 @@ public abstract class TestWriteAvroResult {
protected void verify(final WriteResult writeResult) {
}
@Test
public void testWriteRecursiveRecord() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/recursive.avsc"));
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
final FileInputStream in = new FileInputStream("src/test/resources/avro/recursive.avro");
try (final AvroRecordReader reader = new AvroReaderWithExplicitSchema(in, recordSchema, schema);
final RecordSetWriter writer = createWriter(schema, new ByteArrayOutputStream())) {
final GenericRecord avroRecord = reader.nextAvroRecord();
final Map<String, Object> recordMap = AvroTypeUtil.convertAvroRecordToMap(avroRecord, recordSchema);
final Record record = new MapRecord(recordSchema, recordMap);
try {
writer.write(record);
} catch (StackOverflowError soe) {
Assert.fail("Recursive schema resulted in infinite loop during write");
}
}
}
@Test
public void testWriteRecord() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));

View File

@ -0,0 +1,10 @@
{
"namespace": "nifi",
"type": "record",
"name": "Recursive",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": ["string", "null"]},
{"name": "parent", "type": ["Recursive", "null"]}
]
}