mirror of https://github.com/apache/nifi.git
NIFI-4215 Allow Complex Avro Schema Parsing
NiFi can now parse an Avro schema of a record that references an already defined record, including itself. Signed-off-by: James Wing <jvwing@gmail.com> This closes #2034.
This commit is contained in:
parent
0bb1411532
commit
cf49a58ee7
|
@ -32,8 +32,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||||
|
|
||||||
public class SimpleRecordSchema implements RecordSchema {
|
public class SimpleRecordSchema implements RecordSchema {
|
||||||
private final List<RecordField> fields;
|
private List<RecordField> fields = null;
|
||||||
private final Map<String, Integer> fieldIndices;
|
private Map<String, Integer> fieldIndices = null;
|
||||||
private final boolean textAvailable;
|
private final boolean textAvailable;
|
||||||
private final String text;
|
private final String text;
|
||||||
private final String schemaFormat;
|
private final String schemaFormat;
|
||||||
|
@ -47,34 +47,24 @@ public class SimpleRecordSchema implements RecordSchema {
|
||||||
this(fields, createText(fields), null, false, id);
|
this(fields, createText(fields), null, false, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SimpleRecordSchema(final String text, final String schemaFormat, final SchemaIdentifier id) {
|
||||||
|
this(text, schemaFormat, true, id);
|
||||||
|
}
|
||||||
|
|
||||||
public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
|
public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
|
||||||
this(fields, text, schemaFormat, true, id);
|
this(fields, text, schemaFormat, true, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
|
private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
|
||||||
|
this(text, schemaFormat, textAvailable, id);
|
||||||
|
setFields(fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
|
||||||
this.text = text;
|
this.text = text;
|
||||||
this.schemaFormat = schemaFormat;
|
this.schemaFormat = schemaFormat;
|
||||||
this.schemaIdentifier = id;
|
this.schemaIdentifier = id;
|
||||||
this.textAvailable = textAvailable;
|
this.textAvailable = textAvailable;
|
||||||
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
|
|
||||||
this.fieldIndices = new HashMap<>(fields.size());
|
|
||||||
|
|
||||||
int index = 0;
|
|
||||||
for (final RecordField field : fields) {
|
|
||||||
Integer previousValue = fieldIndices.put(field.getFieldName(), index);
|
|
||||||
if (previousValue != null) {
|
|
||||||
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final String alias : field.getAliases()) {
|
|
||||||
previousValue = fieldIndices.put(alias, index);
|
|
||||||
if (previousValue != null) {
|
|
||||||
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -97,6 +87,33 @@ public class SimpleRecordSchema implements RecordSchema {
|
||||||
return fields;
|
return fields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setFields(final List<RecordField> fields) {
|
||||||
|
|
||||||
|
if (this.fields != null) {
|
||||||
|
throw new IllegalArgumentException("Fields have already been set.");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
|
||||||
|
this.fieldIndices = new HashMap<>(fields.size());
|
||||||
|
|
||||||
|
int index = 0;
|
||||||
|
for (final RecordField field : fields) {
|
||||||
|
Integer previousValue = fieldIndices.put(field.getFieldName(), index);
|
||||||
|
if (previousValue != null) {
|
||||||
|
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final String alias : field.getAliases()) {
|
||||||
|
previousValue = fieldIndices.put(alias, index);
|
||||||
|
if (previousValue != null) {
|
||||||
|
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getFieldCount() {
|
public int getFieldCount() {
|
||||||
return fields.size();
|
return fields.size();
|
||||||
|
|
|
@ -218,6 +218,15 @@ public class AvroTypeUtil {
|
||||||
* @return a Data Type that corresponds to the given Avro Schema
|
* @return a Data Type that corresponds to the given Avro Schema
|
||||||
*/
|
*/
|
||||||
public static DataType determineDataType(final Schema avroSchema) {
|
public static DataType determineDataType(final Schema avroSchema) {
|
||||||
|
return determineDataType(avroSchema, new HashMap<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DataType determineDataType(final Schema avroSchema, Map<String, DataType> knownRecordTypes) {
|
||||||
|
|
||||||
|
if (knownRecordTypes == null) {
|
||||||
|
throw new IllegalArgumentException("'knownRecordTypes' cannot be null.");
|
||||||
|
}
|
||||||
|
|
||||||
final Type avroType = avroSchema.getType();
|
final Type avroType = avroSchema.getType();
|
||||||
|
|
||||||
final LogicalType logicalType = avroSchema.getLogicalType();
|
final LogicalType logicalType = avroSchema.getLogicalType();
|
||||||
|
@ -241,7 +250,7 @@ public class AvroTypeUtil {
|
||||||
|
|
||||||
switch (avroType) {
|
switch (avroType) {
|
||||||
case ARRAY:
|
case ARRAY:
|
||||||
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
|
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType(), knownRecordTypes));
|
||||||
case BYTES:
|
case BYTES:
|
||||||
case FIXED:
|
case FIXED:
|
||||||
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
|
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
|
||||||
|
@ -259,40 +268,50 @@ public class AvroTypeUtil {
|
||||||
case LONG:
|
case LONG:
|
||||||
return RecordFieldType.LONG.getDataType();
|
return RecordFieldType.LONG.getDataType();
|
||||||
case RECORD: {
|
case RECORD: {
|
||||||
final List<Field> avroFields = avroSchema.getFields();
|
String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
|
||||||
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
|
|
||||||
|
|
||||||
for (final Field field : avroFields) {
|
if (knownRecordTypes.containsKey(schemaFullName)) {
|
||||||
final String fieldName = field.name();
|
return knownRecordTypes.get(schemaFullName);
|
||||||
final Schema fieldSchema = field.schema();
|
} else {
|
||||||
final DataType fieldType = determineDataType(fieldSchema);
|
SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
|
||||||
|
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
|
||||||
|
knownRecordTypes.put(schemaFullName, recordSchemaType);
|
||||||
|
|
||||||
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
|
final List<Field> avroFields = avroSchema.getFields();
|
||||||
recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
|
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
|
||||||
} else {
|
|
||||||
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
|
for (final Field field : avroFields) {
|
||||||
|
final String fieldName = field.name();
|
||||||
|
final Schema fieldSchema = field.schema();
|
||||||
|
final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes);
|
||||||
|
|
||||||
|
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
|
||||||
|
recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
|
||||||
|
} else {
|
||||||
|
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
|
recordSchema.setFields(recordFields);
|
||||||
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
|
return recordSchemaType;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case NULL:
|
case NULL:
|
||||||
return RecordFieldType.STRING.getDataType();
|
return RecordFieldType.STRING.getDataType();
|
||||||
case MAP:
|
case MAP:
|
||||||
final Schema valueSchema = avroSchema.getValueType();
|
final Schema valueSchema = avroSchema.getValueType();
|
||||||
final DataType valueType = determineDataType(valueSchema);
|
final DataType valueType = determineDataType(valueSchema, knownRecordTypes);
|
||||||
return RecordFieldType.MAP.getMapDataType(valueType);
|
return RecordFieldType.MAP.getMapDataType(valueType);
|
||||||
case UNION: {
|
case UNION: {
|
||||||
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
|
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
|
||||||
|
|
||||||
if (nonNullSubSchemas.size() == 1) {
|
if (nonNullSubSchemas.size() == 1) {
|
||||||
return determineDataType(nonNullSubSchemas.get(0));
|
return determineDataType(nonNullSubSchemas.get(0), knownRecordTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
|
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
|
||||||
for (final Schema subSchema : nonNullSubSchemas) {
|
for (final Schema subSchema : nonNullSubSchemas) {
|
||||||
final DataType childDataType = determineDataType(subSchema);
|
final DataType childDataType = determineDataType(subSchema, knownRecordTypes);
|
||||||
possibleChildTypes.add(childDataType);
|
possibleChildTypes.add(childDataType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,10 +353,16 @@ public class AvroTypeUtil {
|
||||||
throw new IllegalArgumentException("Avro Schema cannot be null");
|
throw new IllegalArgumentException("Avro Schema cannot be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
|
||||||
|
SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
|
||||||
|
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
|
||||||
|
Map<String, DataType> knownRecords = new HashMap<>();
|
||||||
|
knownRecords.put(schemaFullName, recordSchemaType);
|
||||||
|
|
||||||
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
|
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
|
||||||
for (final Field field : avroSchema.getFields()) {
|
for (final Field field : avroSchema.getFields()) {
|
||||||
final String fieldName = field.name();
|
final String fieldName = field.name();
|
||||||
final DataType dataType = AvroTypeUtil.determineDataType(field.schema());
|
final DataType dataType = AvroTypeUtil.determineDataType(field.schema(), knownRecords);
|
||||||
|
|
||||||
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
|
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
|
||||||
recordFields.add(new RecordField(fieldName, dataType, field.aliases()));
|
recordFields.add(new RecordField(fieldName, dataType, field.aliases()));
|
||||||
|
@ -346,7 +371,7 @@ public class AvroTypeUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, schemaText, AVRO_SCHEMA_FORMAT, schemaId);
|
recordSchema.setFields(recordFields);
|
||||||
return recordSchema;
|
return recordSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.Schema.Field;
|
import org.apache.avro.Schema.Field;
|
||||||
|
@ -33,6 +34,8 @@ import org.apache.nifi.serialization.record.DataType;
|
||||||
import org.apache.nifi.serialization.record.RecordField;
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestAvroTypeUtil {
|
public class TestAvroTypeUtil {
|
||||||
|
@ -100,4 +103,140 @@ public class TestAvroTypeUtil {
|
||||||
assertEquals(Collections.singleton("greeting"), stringField.getAliases());
|
assertEquals(Collections.singleton("greeting"), stringField.getAliases());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
// Simple recursion is a record A composing itself (similar to a LinkedList Node referencing 'next')
|
||||||
|
public void testSimpleRecursiveSchema() {
|
||||||
|
Schema recursiveSchema = new Schema.Parser().parse(
|
||||||
|
"{\n" +
|
||||||
|
" \"namespace\": \"org.apache.nifi.testing\",\n" +
|
||||||
|
" \"name\": \"NodeRecord\",\n" +
|
||||||
|
" \"type\": \"record\",\n" +
|
||||||
|
" \"fields\": [\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"id\",\n" +
|
||||||
|
" \"type\": \"int\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"value\",\n" +
|
||||||
|
" \"type\": \"string\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"parent\",\n" +
|
||||||
|
" \"type\": [\n" +
|
||||||
|
" \"null\",\n" +
|
||||||
|
" \"NodeRecord\"\n" +
|
||||||
|
" ]\n" +
|
||||||
|
" }\n" +
|
||||||
|
" ]\n" +
|
||||||
|
"}\n"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Make sure the following doesn't throw an exception
|
||||||
|
RecordSchema result = AvroTypeUtil.createSchema(recursiveSchema);
|
||||||
|
|
||||||
|
// Make sure it parsed correctly
|
||||||
|
Assert.assertEquals(3, result.getFieldCount());
|
||||||
|
|
||||||
|
Optional<RecordField> idField = result.getField("id");
|
||||||
|
Assert.assertTrue(idField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.INT, idField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> valueField = result.getField("value");
|
||||||
|
Assert.assertTrue(valueField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.STRING, valueField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> parentField = result.getField("parent");
|
||||||
|
Assert.assertTrue(parentField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.RECORD, parentField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
// The 'parent' field should have a circular schema reference to the top level record schema, similar to how Avro handles this
|
||||||
|
Assert.assertEquals(result, ((RecordDataType)parentField.get().getDataType()).getChildSchema());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
// Complicated recursion is a record A composing record B, who composes a record A
|
||||||
|
public void testComplicatedRecursiveSchema() {
|
||||||
|
Schema recursiveSchema = new Schema.Parser().parse(
|
||||||
|
"{\n" +
|
||||||
|
" \"namespace\": \"org.apache.nifi.testing\",\n" +
|
||||||
|
" \"name\": \"Record_A\",\n" +
|
||||||
|
" \"type\": \"record\",\n" +
|
||||||
|
" \"fields\": [\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"id\",\n" +
|
||||||
|
" \"type\": \"int\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"value\",\n" +
|
||||||
|
" \"type\": \"string\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"child\",\n" +
|
||||||
|
" \"type\": {\n" +
|
||||||
|
" \"namespace\": \"org.apache.nifi.testing\",\n" +
|
||||||
|
" \"name\": \"Record_B\",\n" +
|
||||||
|
" \"type\": \"record\",\n" +
|
||||||
|
" \"fields\": [\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"id\",\n" +
|
||||||
|
" \"type\": \"int\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"value\",\n" +
|
||||||
|
" \"type\": \"string\"\n" +
|
||||||
|
" },\n" +
|
||||||
|
" {\n" +
|
||||||
|
" \"name\": \"parent\",\n" +
|
||||||
|
" \"type\": [\n" +
|
||||||
|
" \"null\",\n" +
|
||||||
|
" \"Record_A\"\n" +
|
||||||
|
" ]\n" +
|
||||||
|
" }\n" +
|
||||||
|
" ]\n" +
|
||||||
|
" }\n" +
|
||||||
|
" }\n" +
|
||||||
|
" ]\n" +
|
||||||
|
"}\n"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Make sure the following doesn't throw an exception
|
||||||
|
RecordSchema recordASchema = AvroTypeUtil.createSchema(recursiveSchema);
|
||||||
|
|
||||||
|
// Make sure it parsed correctly
|
||||||
|
Assert.assertEquals(3, recordASchema.getFieldCount());
|
||||||
|
|
||||||
|
Optional<RecordField> recordAIdField = recordASchema.getField("id");
|
||||||
|
Assert.assertTrue(recordAIdField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.INT, recordAIdField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> recordAValueField = recordASchema.getField("value");
|
||||||
|
Assert.assertTrue(recordAValueField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.STRING, recordAValueField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> recordAChildField = recordASchema.getField("child");
|
||||||
|
Assert.assertTrue(recordAChildField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.RECORD, recordAChildField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
// Get the child schema
|
||||||
|
RecordSchema recordBSchema = ((RecordDataType)recordAChildField.get().getDataType()).getChildSchema();
|
||||||
|
|
||||||
|
// Make sure it parsed correctly
|
||||||
|
Assert.assertEquals(3, recordBSchema.getFieldCount());
|
||||||
|
|
||||||
|
Optional<RecordField> recordBIdField = recordBSchema.getField("id");
|
||||||
|
Assert.assertTrue(recordBIdField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.INT, recordBIdField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> recordBValueField = recordBSchema.getField("value");
|
||||||
|
Assert.assertTrue(recordBValueField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.STRING, recordBValueField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
Optional<RecordField> recordBParentField = recordBSchema.getField("parent");
|
||||||
|
Assert.assertTrue(recordBParentField.isPresent());
|
||||||
|
Assert.assertEquals(RecordFieldType.RECORD, recordBParentField.get().getDataType().getFieldType());
|
||||||
|
|
||||||
|
// Make sure the 'parent' field has a schema reference back to the original top level record schema
|
||||||
|
Assert.assertEquals(recordASchema, ((RecordDataType)recordBParentField.get().getDataType()).getChildSchema());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue