NIFI-3861: Fixed AvroReader nullable logical types issue

- AvroReader did not convert logical types if those are defined with union
- Consolidated createSchema method in AvroSchemaRegistry and AvroTypeUtil as both has identical implementation and mai
ntaining both would be error-prone

This closes #1779.
This commit is contained in:
Koji Kawamura 2017-05-10 22:13:06 +09:00 committed by Mark Payne
parent b6bdc4a0a8
commit 72de1cbdef
6 changed files with 211 additions and 192 deletions

View File

@ -18,7 +18,6 @@
package org.apache.nifi.avro; package org.apache.nifi.avro;
import org.apache.avro.LogicalType; import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type; import org.apache.avro.Schema.Type;
@ -43,17 +42,25 @@ import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class AvroTypeUtil { public class AvroTypeUtil {
public static final String AVRO_SCHEMA_FORMAT = "avro"; public static final String AVRO_SCHEMA_FORMAT = "avro";
private static final String LOGICAL_TYPE_DATE = "date";
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
if (recordSchema == null) { if (recordSchema == null) {
throw new IllegalArgumentException("RecordSchema cannot be null"); throw new IllegalArgumentException("RecordSchema cannot be null");
@ -78,16 +85,36 @@ public class AvroTypeUtil {
return new Schema.Parser().parse(text); return new Schema.Parser().parse(text);
} }
/**
* Returns a DataType for the given Avro Schema
*
* @param avroSchema the Avro Schema to convert
* @return a Data Type that corresponds to the given Avro Schema
*/
public static DataType determineDataType(final Schema avroSchema) { public static DataType determineDataType(final Schema avroSchema) {
final Type avroType = avroSchema.getType(); final Type avroType = avroSchema.getType();
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
final String logicalTypeName = logicalType.getName();
switch (logicalTypeName) {
case LOGICAL_TYPE_DATE:
return RecordFieldType.DATE.getDataType();
case LOGICAL_TYPE_TIME_MILLIS:
case LOGICAL_TYPE_TIME_MICROS:
return RecordFieldType.TIME.getDataType();
case LOGICAL_TYPE_TIMESTAMP_MILLIS:
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
}
}
switch (avroType) { switch (avroType) {
case ARRAY:
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
case BYTES: case BYTES:
case FIXED: case FIXED:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case ARRAY:
final DataType elementType = determineDataType(avroSchema.getElementType());
return RecordFieldType.ARRAY.getArrayDataType(elementType);
case BOOLEAN: case BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType(); return RecordFieldType.BOOLEAN.getDataType();
case DOUBLE: case DOUBLE:
@ -97,36 +124,10 @@ public class AvroTypeUtil {
return RecordFieldType.STRING.getDataType(); return RecordFieldType.STRING.getDataType();
case FLOAT: case FLOAT:
return RecordFieldType.FLOAT.getDataType(); return RecordFieldType.FLOAT.getDataType();
case INT: { case INT:
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType == null) {
return RecordFieldType.INT.getDataType();
}
if (LogicalTypes.date().getName().equals(logicalType.getName())) {
return RecordFieldType.DATE.getDataType();
} else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
return RecordFieldType.TIME.getDataType();
}
return RecordFieldType.INT.getDataType(); return RecordFieldType.INT.getDataType();
} case LONG:
case LONG: {
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType == null) {
return RecordFieldType.LONG.getDataType();
}
if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
return RecordFieldType.TIMESTAMP.getDataType();
} else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
return RecordFieldType.TIMESTAMP.getDataType();
} else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
return RecordFieldType.TIME.getDataType();
}
return RecordFieldType.LONG.getDataType(); return RecordFieldType.LONG.getDataType();
}
case RECORD: { case RECORD: {
final List<Field> avroFields = avroSchema.getFields(); final List<Field> avroFields = avroSchema.getFields();
final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
@ -135,6 +136,7 @@ public class AvroTypeUtil {
final String fieldName = field.name(); final String fieldName = field.name();
final Schema fieldSchema = field.schema(); final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema); final DataType fieldType = determineDataType(fieldSchema);
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
} }
@ -148,9 +150,7 @@ public class AvroTypeUtil {
final DataType valueType = determineDataType(valueSchema); final DataType valueType = determineDataType(valueSchema);
return RecordFieldType.MAP.getMapDataType(valueType); return RecordFieldType.MAP.getMapDataType(valueType);
case UNION: { case UNION: {
final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());
if (nonNullSubSchemas.size() == 1) { if (nonNullSubSchemas.size() == 1) {
return determineDataType(nonNullSubSchemas.get(0)); return determineDataType(nonNullSubSchemas.get(0));
@ -169,11 +169,37 @@ public class AvroTypeUtil {
return null; return null;
} }
private static List<Schema> getNonNullSubSchemas(Schema avroSchema) {
List<Schema> unionFieldSchemas = avroSchema.getTypes();
if (unionFieldSchemas == null) {
return Collections.emptyList();
}
return unionFieldSchemas.stream()
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());
}
public static RecordSchema createSchema(final Schema avroSchema) { public static RecordSchema createSchema(final Schema avroSchema) {
if (avroSchema == null) { if (avroSchema == null) {
throw new IllegalArgumentException("Avro Schema cannot be null"); throw new IllegalArgumentException("Avro Schema cannot be null");
} }
return createSchema(avroSchema, avroSchema.toString(), SchemaIdentifier.EMPTY);
}
/**
* Converts an Avro Schema to a RecordSchema
*
* @param avroSchema the Avro Schema to convert
* @param schemaText the textual representation of the schema
* @param schemaId the identifier of the schema
* @return the Corresponding Record Schema
*/
public static RecordSchema createSchema(final Schema avroSchema, final String schemaText, final SchemaIdentifier schemaId) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro Schema cannot be null");
}
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) { for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name(); final String fieldName = field.name();
@ -182,7 +208,7 @@ public class AvroTypeUtil {
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
} }
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, schemaText, AVRO_SCHEMA_FORMAT, schemaId);
return recordSchema; return recordSchema;
} }
@ -227,7 +253,11 @@ public class AvroTypeUtil {
return rec; return rec;
} }
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) throws IOException { /**
* Convert a raw value to an Avro object to serialize in Avro type system.
* The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema)}.
*/
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
if (rawValue == null) { if (rawValue == null) {
return null; return null;
} }
@ -239,13 +269,13 @@ public class AvroTypeUtil {
return DataTypeUtils.toInteger(rawValue, fieldName); return DataTypeUtils.toInteger(rawValue, fieldName);
} }
if (LogicalTypes.date().getName().equals(logicalType.getName())) { if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final Date date = new Date(longValue); final Date date = new Date(longValue);
final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant()); final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant());
final long days = duration.toDays(); final long days = duration.toDays();
return (int) days; return (int) days;
} else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final Date date = new Date(longValue); final Date date = new Date(longValue);
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
@ -261,14 +291,14 @@ public class AvroTypeUtil {
return DataTypeUtils.toLong(rawValue, fieldName); return DataTypeUtils.toLong(rawValue, fieldName);
} }
if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final Date date = new Date(longValue); final Date date = new Date(longValue);
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
return duration.toMillis() * 1000L; return duration.toMillis() * 1000L;
} else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
return DataTypeUtils.toLong(rawValue, fieldName); return DataTypeUtils.toLong(rawValue, fieldName);
} else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
return DataTypeUtils.toLong(rawValue, fieldName) * 1000L; return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
} }
@ -319,28 +349,25 @@ public class AvroTypeUtil {
} }
return avroRecord; return avroRecord;
case UNION: case UNION:
List<Schema> unionFieldSchemas = fieldSchema.getTypes(); // Ignore null types in union
if (unionFieldSchemas != null) { final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema);
// Ignore null types in union
final List<Schema> nonNullFieldSchemas = unionFieldSchemas.stream()
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());
// If at least one non-null type exists, find the first compatible type // If at least one non-null type exists, find the first compatible type
if (nonNullFieldSchemas.size() >= 1) { if (nonNullFieldSchemas.size() >= 1) {
for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName); final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName);
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType)) { if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType)
return avroObject; // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
} || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(rawValue, desiredDataType))) {
return avroObject;
} }
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass()
+ " because no compatible types exist in the UNION");
} }
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass()
+ " because no compatible types exist in the UNION");
} }
return null; return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName));
case ARRAY: case ARRAY:
final Object[] objectArray = (Object[]) rawValue; final Object[] objectArray = (Object[]) rawValue;
final List<Object> list = new ArrayList<>(objectArray.length); final List<Object> list = new ArrayList<>(objectArray.length);
@ -399,6 +426,39 @@ public class AvroTypeUtil {
return values; return values;
} }
/**
* Convert value of a nullable union field.
* @param originalValue original value
* @param fieldSchema the union field schema
* @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value
* @return a converted value
*/
private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function<Schema, Object> conversion) {
// Ignore null types in union
final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema);
// If at least one non-null type exists, find the first compatible type
if (nonNullFieldSchemas.size() >= 1) {
for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
final Object convertedValue = conversion.apply(nonNullFieldSchema);
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType)
// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
|| (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
return convertedValue;
}
}
throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass()
+ " because no compatible types exist in the UNION");
}
return null;
}
/**
* Convert an Avro object to a normal Java objects for further processing.
* The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)}
*/
private static Object normalizeValue(final Object value, final Schema avroSchema) { private static Object normalizeValue(final Object value, final Schema avroSchema) {
if (value == null) { if (value == null) {
return null; return null;
@ -412,10 +472,10 @@ public class AvroTypeUtil {
} }
final String logicalName = logicalType.getName(); final String logicalName = logicalType.getName();
if (LogicalTypes.date().getName().equals(logicalName)) { if (LOGICAL_TYPE_DATE.equals(logicalName)) {
// date logical name means that the value is number of days since Jan 1, 1970 // date logical name means that the value is number of days since Jan 1, 1970
return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value)); return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
} else if (LogicalTypes.timeMillis().equals(logicalName)) { } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
// time-millis logical name means that the value is number of milliseconds since midnight. // time-millis logical name means that the value is number of milliseconds since midnight.
return new java.sql.Time((int) value); return new java.sql.Time((int) value);
} }
@ -429,11 +489,11 @@ public class AvroTypeUtil {
} }
final String logicalName = logicalType.getName(); final String logicalName = logicalType.getName();
if (LogicalTypes.timeMicros().getName().equals(logicalName)) { if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value)); return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
} else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) { } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
return new java.sql.Timestamp((long) value); return new java.sql.Timestamp((long) value);
} else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) { } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value)); return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
} }
break; break;
@ -443,7 +503,7 @@ public class AvroTypeUtil {
final GenericData.Record avroRecord = (GenericData.Record) value; final GenericData.Record avroRecord = (GenericData.Record) value;
return normalizeValue(value, avroRecord.getSchema()); return normalizeValue(value, avroRecord.getSchema());
} }
break; return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema));
case RECORD: case RECORD:
final GenericData.Record record = (GenericData.Record) value; final GenericData.Record record = (GenericData.Record) value;
final Schema recordSchema = record.getSchema(); final Schema recordSchema = record.getSchema();

View File

@ -35,8 +35,8 @@
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.avro</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>avro</artifactId> <artifactId>nifi-avro-record-utils</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>

View File

@ -17,34 +17,26 @@
package org.apache.nifi.schemaregistry.services; package org.apache.nifi.schemaregistry.services;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.SchemaIdentifier;
@ -57,12 +49,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
private final Map<String, String> schemaNameToSchemaMap; private final Map<String, String> schemaNameToSchemaMap;
private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>(); private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
private static final String LOGICAL_TYPE_DATE = "date";
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
public AvroSchemaRegistry() { public AvroSchemaRegistry() {
this.schemaNameToSchemaMap = new HashMap<>(); this.schemaNameToSchemaMap = new HashMap<>();
} }
@ -74,7 +60,8 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
} else { } else {
try { try {
final Schema avroSchema = new Schema.Parser().parse(newValue); final Schema avroSchema = new Schema.Parser().parse(newValue);
final RecordSchema recordSchema = createRecordSchema(avroSchema, newValue, descriptor.getName()); final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName());
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
recordSchemas.put(descriptor.getName(), recordSchema); recordSchemas.put(descriptor.getName(), recordSchema);
} catch (final Exception e) { } catch (final Exception e) {
// not a problem - the service won't be valid and the validation message will indicate what is wrong. // not a problem - the service won't be valid and the validation message will indicate what is wrong.
@ -137,113 +124,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
} }
/**
* Converts an Avro Schema to a RecordSchema
*
* @param avroSchema the Avro Schema to convert
* @param text the textual representation of the schema
* @param schemaName the name of the schema
* @return the Corresponding Record Schema
*/
private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final String schemaName) {
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name();
final DataType dataType = determineDataType(field.schema());
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
}
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", SchemaIdentifier.ofName(schemaName));
return recordSchema;
}
/**
* Returns a DataType for the given Avro Schema
*
* @param avroSchema the Avro Schema to convert
* @return a Data Type that corresponds to the given Avro Schema
*/
private DataType determineDataType(final Schema avroSchema) {
final Type avroType = avroSchema.getType();
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
final String logicalTypeName = logicalType.getName();
switch (logicalTypeName) {
case LOGICAL_TYPE_DATE:
return RecordFieldType.DATE.getDataType();
case LOGICAL_TYPE_TIME_MILLIS:
case LOGICAL_TYPE_TIME_MICROS:
return RecordFieldType.TIME.getDataType();
case LOGICAL_TYPE_TIMESTAMP_MILLIS:
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
}
}
switch (avroType) {
case ARRAY:
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
case BYTES:
case FIXED:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType();
case DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case ENUM:
case STRING:
return RecordFieldType.STRING.getDataType();
case FLOAT:
return RecordFieldType.FLOAT.getDataType();
case INT:
return RecordFieldType.INT.getDataType();
case LONG:
return RecordFieldType.LONG.getDataType();
case RECORD: {
final List<Field> avroFields = avroSchema.getFields();
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
for (final Field field : avroFields) {
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema);
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
}
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY);
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
}
case NULL:
return RecordFieldType.STRING.getDataType();
case MAP:
final Schema valueSchema = avroSchema.getValueType();
final DataType valueType = determineDataType(valueSchema);
return RecordFieldType.MAP.getMapDataType(valueType);
case UNION: {
final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());
if (nonNullSubSchemas.size() == 1) {
return determineDataType(nonNullSubSchemas.get(0));
}
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
for (final Schema subSchema : nonNullSubSchemas) {
final DataType childDataType = determineDataType(subSchema);
possibleChildTypes.add(childDataType);
}
return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
}
}
return null;
}
@Override @Override
public Set<SchemaField> getSuppliedSchemaFields() { public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields; return schemaFields;

View File

@ -94,6 +94,7 @@
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>src/test/resources/avro/datatypes.avsc</exclude> <exclude>src/test/resources/avro/datatypes.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude> <exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude> <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude> <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude> <exclude>src/test/resources/csv/single-bank-account.csv</exclude>

View File

@ -62,7 +62,16 @@ public class TestAvroReaderWithEmbeddedSchema {
@Test @Test
public void testLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException { public void testLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc")); final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
testLogicalTypes(schema);
}
@Test
public void testNullableLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types-nullable.avsc"));
testLogicalTypes(schema);
}
private void testLogicalTypes(Schema schema) throws ParseException, IOException, MalformedRecordException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String expectedTime = "2017-04-04 14:20:33.000"; final String expectedTime = "2017-04-04 14:20:33.000";
@ -80,7 +89,7 @@ public class TestAvroReaderWithEmbeddedSchema {
final DataFileWriter<GenericRecord> writer = dataFileWriter.create(schema, baos)) { final DataFileWriter<GenericRecord> writer = dataFileWriter.create(schema, baos)) {
final GenericRecord record = new GenericData.Record(schema); final GenericRecord record = new GenericData.Record(schema);
record.put("timeMillis", millisSinceMidnight); record.put("timeMillis", (int) millisSinceMidnight);
record.put("timeMicros", millisSinceMidnight * 1000L); record.put("timeMicros", millisSinceMidnight * 1000L);
record.put("timestampMillis", timeLong); record.put("timestampMillis", timeLong);
record.put("timestampMicros", timeLong * 1000L); record.put("timestampMicros", timeLong * 1000L);

View File

@ -0,0 +1,69 @@
{
"namespace": "nifi",
"name": "data_types",
"type": "record",
"fields": [
{
"name": "timeMillis",
"type": [
"null",
{
"type": "int",
"logicalType": "time-millis"
}
]
},
{
"name": "timeMicros",
"type": [
"null",
{
"type": "long",
"logicalType": "time-micros"
}
]
},
{
"name": "timestampMillis",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
]
},
{
"name": "timestampMicros",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
]
},
{
"name": "date",
"type": [
"null",
{
"type": "int",
"logicalType": "date"
}
]
},
{
"name": "decimal",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 5,
"scale": 2
}
]
}
]
}