NIFI-4029: Allow null Avro default values in HortonworksSchemaRegistry

This closes #1894.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Steve Champagne 2017-06-07 12:36:28 +00:00 committed by Bryan Bende
parent c86190c513
commit 45e035686f
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 13 additions and 119 deletions

View File

@ -152,7 +152,11 @@ public class AvroTypeUtil {
final Schema fieldSchema = field.schema(); final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema); final DataType fieldType = determineDataType(fieldSchema);
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
} else {
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
}
} }
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);

View File

@ -41,6 +41,10 @@ limitations under the License.
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId> <artifactId>nifi-schema-registry-service-api</artifactId>

View File

@ -25,12 +25,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.avro.Schema.Type;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -42,10 +39,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
@ -261,7 +254,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText); final Schema schema = new Schema.Parser().parse(schemaText);
return createRecordSchema(schema, schemaText, schemaIdentifier); return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
}); });
} }
@ -309,120 +302,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText); final Schema schema = new Schema.Parser().parse(schemaText);
return createRecordSchema(schema, schemaText, schemaIdentifier); return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
}); });
} }
/**
* Converts an Avro Schema to a RecordSchema
*
* @param avroSchema the Avro Schema to convert
* @param text the textual representation of the schema
* @param schemaId the id of the schema
* @return the Corresponding Record Schema
*/
private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) {
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name();
final DataType dataType = determineDataType(field.schema());
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
}
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId);
return recordSchema;
}
/**
* Returns a DataType for the given Avro Schema
*
* @param avroSchema the Avro Schema to convert
* @return a Data Type that corresponds to the given Avro Schema
*/
private DataType determineDataType(final Schema avroSchema) {
final Type avroType = avroSchema.getType();
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
final String logicalTypeName = logicalType.getName();
switch (logicalTypeName) {
case LOGICAL_TYPE_DATE:
return RecordFieldType.DATE.getDataType();
case LOGICAL_TYPE_TIME_MILLIS:
case LOGICAL_TYPE_TIME_MICROS:
return RecordFieldType.TIME.getDataType();
case LOGICAL_TYPE_TIMESTAMP_MILLIS:
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
}
}
switch (avroType) {
case ARRAY:
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
case BYTES:
case FIXED:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType();
case DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case ENUM:
case STRING:
return RecordFieldType.STRING.getDataType();
case FLOAT:
return RecordFieldType.FLOAT.getDataType();
case INT:
return RecordFieldType.INT.getDataType();
case LONG:
return RecordFieldType.LONG.getDataType();
case RECORD: {
final List<Field> avroFields = avroSchema.getFields();
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
for (final Field field : avroFields) {
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema);
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
}
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY);
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
}
case NULL:
return RecordFieldType.STRING.getDataType();
case MAP:
final Schema valueSchema = avroSchema.getValueType();
final DataType valueType = determineDataType(valueSchema);
return RecordFieldType.MAP.getMapDataType(valueType);
case UNION: {
final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());
if (nonNullSubSchemas.size() == 1) {
return determineDataType(nonNullSubSchemas.get(0));
}
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
for (final Schema subSchema : nonNullSubSchemas) {
final DataType childDataType = determineDataType(subSchema);
possibleChildTypes.add(childDataType);
}
return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
}
}
return null;
}
@Override @Override
public Set<SchemaField> getSuppliedSchemaFields() { public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields; return schemaFields;
} }
} }