NIFI-4215 Revert Complex Avro Schema Changes

This reverts commit cf49a58ee7.
This commit is contained in:
James Wing 2017-08-01 21:03:04 -07:00
parent bcf60aa556
commit 2502b79bae
3 changed files with 40 additions and 221 deletions

View File

@ -32,8 +32,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.SchemaIdentifier;
public class SimpleRecordSchema implements RecordSchema { public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null; private final List<RecordField> fields;
private Map<String, Integer> fieldIndices = null; private final Map<String, Integer> fieldIndices;
private final boolean textAvailable; private final boolean textAvailable;
private final String text; private final String text;
private final String schemaFormat; private final String schemaFormat;
@ -47,24 +47,34 @@ public class SimpleRecordSchema implements RecordSchema {
this(fields, createText(fields), null, false, id); this(fields, createText(fields), null, false, id);
} }
public SimpleRecordSchema(final String text, final String schemaFormat, final SchemaIdentifier id) {
this(text, schemaFormat, true, id);
}
public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) { public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
this(fields, text, schemaFormat, true, id); this(fields, text, schemaFormat, true, id);
} }
private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) { private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
this(text, schemaFormat, textAvailable, id);
setFields(fields);
}
private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
this.text = text; this.text = text;
this.schemaFormat = schemaFormat; this.schemaFormat = schemaFormat;
this.schemaIdentifier = id; this.schemaIdentifier = id;
this.textAvailable = textAvailable; this.textAvailable = textAvailable;
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.fieldIndices = new HashMap<>(fields.size());
int index = 0;
for (final RecordField field : fields) {
Integer previousValue = fieldIndices.put(field.getFieldName(), index);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
for (final String alias : field.getAliases()) {
previousValue = fieldIndices.put(alias, index);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
}
index++;
}
} }
@Override @Override
@ -87,33 +97,6 @@ public class SimpleRecordSchema implements RecordSchema {
return fields; return fields;
} }
public void setFields(final List<RecordField> fields) {
if (this.fields != null) {
throw new IllegalArgumentException("Fields have already been set.");
}
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.fieldIndices = new HashMap<>(fields.size());
int index = 0;
for (final RecordField field : fields) {
Integer previousValue = fieldIndices.put(field.getFieldName(), index);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
for (final String alias : field.getAliases()) {
previousValue = fieldIndices.put(alias, index);
if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
}
}
index++;
}
}
@Override @Override
public int getFieldCount() { public int getFieldCount() {
return fields.size(); return fields.size();

View File

@ -218,15 +218,6 @@ public class AvroTypeUtil {
* @return a Data Type that corresponds to the given Avro Schema * @return a Data Type that corresponds to the given Avro Schema
*/ */
public static DataType determineDataType(final Schema avroSchema) { public static DataType determineDataType(final Schema avroSchema) {
return determineDataType(avroSchema, new HashMap<>());
}
public static DataType determineDataType(final Schema avroSchema, Map<String, DataType> knownRecordTypes) {
if (knownRecordTypes == null) {
throw new IllegalArgumentException("'knownRecordTypes' cannot be null.");
}
final Type avroType = avroSchema.getType(); final Type avroType = avroSchema.getType();
final LogicalType logicalType = avroSchema.getLogicalType(); final LogicalType logicalType = avroSchema.getLogicalType();
@ -250,7 +241,7 @@ public class AvroTypeUtil {
switch (avroType) { switch (avroType) {
case ARRAY: case ARRAY:
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType(), knownRecordTypes)); return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
case BYTES: case BYTES:
case FIXED: case FIXED:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
@ -268,50 +259,40 @@ public class AvroTypeUtil {
case LONG: case LONG:
return RecordFieldType.LONG.getDataType(); return RecordFieldType.LONG.getDataType();
case RECORD: { case RECORD: {
String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); final List<Field> avroFields = avroSchema.getFields();
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
if (knownRecordTypes.containsKey(schemaFullName)) { for (final Field field : avroFields) {
return knownRecordTypes.get(schemaFullName); final String fieldName = field.name();
} else { final Schema fieldSchema = field.schema();
SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); final DataType fieldType = determineDataType(fieldSchema);
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
knownRecordTypes.put(schemaFullName, recordSchemaType);
final List<Field> avroFields = avroSchema.getFields(); if (field.defaultVal() == JsonProperties.NULL_VALUE) {
final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
} else {
for (final Field field : avroFields) { recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes);
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
} else {
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
}
} }
recordSchema.setFields(recordFields);
return recordSchemaType;
} }
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
} }
case NULL: case NULL:
return RecordFieldType.STRING.getDataType(); return RecordFieldType.STRING.getDataType();
case MAP: case MAP:
final Schema valueSchema = avroSchema.getValueType(); final Schema valueSchema = avroSchema.getValueType();
final DataType valueType = determineDataType(valueSchema, knownRecordTypes); final DataType valueType = determineDataType(valueSchema);
return RecordFieldType.MAP.getMapDataType(valueType); return RecordFieldType.MAP.getMapDataType(valueType);
case UNION: { case UNION: {
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema); final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
if (nonNullSubSchemas.size() == 1) { if (nonNullSubSchemas.size() == 1) {
return determineDataType(nonNullSubSchemas.get(0), knownRecordTypes); return determineDataType(nonNullSubSchemas.get(0));
} }
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
for (final Schema subSchema : nonNullSubSchemas) { for (final Schema subSchema : nonNullSubSchemas) {
final DataType childDataType = determineDataType(subSchema, knownRecordTypes); final DataType childDataType = determineDataType(subSchema);
possibleChildTypes.add(childDataType); possibleChildTypes.add(childDataType);
} }
@ -353,16 +334,10 @@ public class AvroTypeUtil {
throw new IllegalArgumentException("Avro Schema cannot be null"); throw new IllegalArgumentException("Avro Schema cannot be null");
} }
String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
Map<String, DataType> knownRecords = new HashMap<>();
knownRecords.put(schemaFullName, recordSchemaType);
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) { for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name(); final String fieldName = field.name();
final DataType dataType = AvroTypeUtil.determineDataType(field.schema(), knownRecords); final DataType dataType = AvroTypeUtil.determineDataType(field.schema());
if (field.defaultVal() == JsonProperties.NULL_VALUE) { if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, dataType, field.aliases())); recordFields.add(new RecordField(fieldName, dataType, field.aliases()));
@ -371,7 +346,7 @@ public class AvroTypeUtil {
} }
} }
recordSchema.setFields(recordFields); final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, schemaText, AVRO_SCHEMA_FORMAT, schemaId);
return recordSchema; return recordSchema;
} }

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Field;
@ -34,8 +33,6 @@ import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestAvroTypeUtil { public class TestAvroTypeUtil {
@ -103,140 +100,4 @@ public class TestAvroTypeUtil {
assertEquals(Collections.singleton("greeting"), stringField.getAliases()); assertEquals(Collections.singleton("greeting"), stringField.getAliases());
} }
@Test
// Simple recursion is a record A composing itself (similar to a LinkedList Node referencing 'next')
public void testSimpleRecursiveSchema() {
Schema recursiveSchema = new Schema.Parser().parse(
"{\n" +
" \"namespace\": \"org.apache.nifi.testing\",\n" +
" \"name\": \"NodeRecord\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"id\",\n" +
" \"type\": \"int\"\n" +
" },\n" +
" {\n" +
" \"name\": \"value\",\n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"name\": \"parent\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"NodeRecord\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}\n"
);
// Make sure the following doesn't throw an exception
RecordSchema result = AvroTypeUtil.createSchema(recursiveSchema);
// Make sure it parsed correctly
Assert.assertEquals(3, result.getFieldCount());
Optional<RecordField> idField = result.getField("id");
Assert.assertTrue(idField.isPresent());
Assert.assertEquals(RecordFieldType.INT, idField.get().getDataType().getFieldType());
Optional<RecordField> valueField = result.getField("value");
Assert.assertTrue(valueField.isPresent());
Assert.assertEquals(RecordFieldType.STRING, valueField.get().getDataType().getFieldType());
Optional<RecordField> parentField = result.getField("parent");
Assert.assertTrue(parentField.isPresent());
Assert.assertEquals(RecordFieldType.RECORD, parentField.get().getDataType().getFieldType());
// The 'parent' field should have a circular schema reference to the top level record schema, similar to how Avro handles this
Assert.assertEquals(result, ((RecordDataType)parentField.get().getDataType()).getChildSchema());
}
@Test
// Complicated recursion is a record A composing record B, who composes a record A
public void testComplicatedRecursiveSchema() {
Schema recursiveSchema = new Schema.Parser().parse(
"{\n" +
" \"namespace\": \"org.apache.nifi.testing\",\n" +
" \"name\": \"Record_A\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"id\",\n" +
" \"type\": \"int\"\n" +
" },\n" +
" {\n" +
" \"name\": \"value\",\n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"name\": \"child\",\n" +
" \"type\": {\n" +
" \"namespace\": \"org.apache.nifi.testing\",\n" +
" \"name\": \"Record_B\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"id\",\n" +
" \"type\": \"int\"\n" +
" },\n" +
" {\n" +
" \"name\": \"value\",\n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"name\": \"parent\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"Record_A\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" ]\n" +
"}\n"
);
// Make sure the following doesn't throw an exception
RecordSchema recordASchema = AvroTypeUtil.createSchema(recursiveSchema);
// Make sure it parsed correctly
Assert.assertEquals(3, recordASchema.getFieldCount());
Optional<RecordField> recordAIdField = recordASchema.getField("id");
Assert.assertTrue(recordAIdField.isPresent());
Assert.assertEquals(RecordFieldType.INT, recordAIdField.get().getDataType().getFieldType());
Optional<RecordField> recordAValueField = recordASchema.getField("value");
Assert.assertTrue(recordAValueField.isPresent());
Assert.assertEquals(RecordFieldType.STRING, recordAValueField.get().getDataType().getFieldType());
Optional<RecordField> recordAChildField = recordASchema.getField("child");
Assert.assertTrue(recordAChildField.isPresent());
Assert.assertEquals(RecordFieldType.RECORD, recordAChildField.get().getDataType().getFieldType());
// Get the child schema
RecordSchema recordBSchema = ((RecordDataType)recordAChildField.get().getDataType()).getChildSchema();
// Make sure it parsed correctly
Assert.assertEquals(3, recordBSchema.getFieldCount());
Optional<RecordField> recordBIdField = recordBSchema.getField("id");
Assert.assertTrue(recordBIdField.isPresent());
Assert.assertEquals(RecordFieldType.INT, recordBIdField.get().getDataType().getFieldType());
Optional<RecordField> recordBValueField = recordBSchema.getField("value");
Assert.assertTrue(recordBValueField.isPresent());
Assert.assertEquals(RecordFieldType.STRING, recordBValueField.get().getDataType().getFieldType());
Optional<RecordField> recordBParentField = recordBSchema.getField("parent");
Assert.assertTrue(recordBParentField.isPresent());
Assert.assertEquals(RecordFieldType.RECORD, recordBParentField.get().getDataType().getFieldType());
// Make sure the 'parent' field has a schema reference back to the original top level record schema
Assert.assertEquals(recordASchema, ((RecordDataType)recordBParentField.get().getDataType()).getChildSchema());
}
} }