From 72de1cbdef05b0b61ed78871b15f2d06711760a1 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 10 May 2017 22:13:06 +0900 Subject: [PATCH] NIFI-3861: Fixed AvroReader nullable logical types issue - AvroReader did not convert logical types if those are defined with union - Consolidated createSchema method in AvroSchemaRegistry and AvroTypeUtil as both has identical implementation and mai ntaining both would be error-prone This closes #1779. --- .../org/apache/nifi/avro/AvroTypeUtil.java | 192 ++++++++++++------ .../nifi-registry-service/pom.xml | 4 +- .../services/AvroSchemaRegistry.java | 126 +----------- .../pom.xml | 1 + .../TestAvroReaderWithEmbeddedSchema.java | 11 +- .../avro/logical-types-nullable.avsc | 69 +++++++ 6 files changed, 211 insertions(+), 192 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index bfdba3d15a..38e87a8496 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -18,7 +18,6 @@ package org.apache.nifi.avro; import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -43,17 +42,25 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; public class AvroTypeUtil { public static final String AVRO_SCHEMA_FORMAT = "avro"; + private static final String LOGICAL_TYPE_DATE = "date"; + private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; + private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; + private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; + private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; + public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { if (recordSchema == null) { throw new IllegalArgumentException("RecordSchema cannot be null"); @@ -78,16 +85,36 @@ public class AvroTypeUtil { return new Schema.Parser().parse(text); } + /** + * Returns a DataType for the given Avro Schema + * + * @param avroSchema the Avro Schema to convert + * @return a Data Type that corresponds to the given Avro Schema + */ public static DataType determineDataType(final Schema avroSchema) { final Type avroType = avroSchema.getType(); + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType != null) { + final String logicalTypeName = logicalType.getName(); + switch (logicalTypeName) { + case LOGICAL_TYPE_DATE: + return RecordFieldType.DATE.getDataType(); + case LOGICAL_TYPE_TIME_MILLIS: + case LOGICAL_TYPE_TIME_MICROS: + return RecordFieldType.TIME.getDataType(); + case LOGICAL_TYPE_TIMESTAMP_MILLIS: + case LOGICAL_TYPE_TIMESTAMP_MICROS: + return RecordFieldType.TIMESTAMP.getDataType(); + } + } + switch (avroType) { + case ARRAY: + return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); case BYTES: case FIXED: return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case ARRAY: - final DataType elementType = determineDataType(avroSchema.getElementType()); - return RecordFieldType.ARRAY.getArrayDataType(elementType); case BOOLEAN: return RecordFieldType.BOOLEAN.getDataType(); case DOUBLE: @@ -97,36 +124,10 @@ public class AvroTypeUtil { return RecordFieldType.STRING.getDataType(); case FLOAT: return RecordFieldType.FLOAT.getDataType(); - case INT: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return RecordFieldType.INT.getDataType(); - } - - if (LogicalTypes.date().getName().equals(logicalType.getName())) { - return RecordFieldType.DATE.getDataType(); - } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { - return RecordFieldType.TIME.getDataType(); - } - + case INT: return RecordFieldType.INT.getDataType(); - } - case LONG: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return RecordFieldType.LONG.getDataType(); - } - - if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { - return RecordFieldType.TIMESTAMP.getDataType(); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { - return RecordFieldType.TIMESTAMP.getDataType(); - } else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { - return RecordFieldType.TIME.getDataType(); - } - + case LONG: return RecordFieldType.LONG.getDataType(); - } case RECORD: { final List avroFields = avroSchema.getFields(); final List recordFields = new ArrayList<>(avroFields.size()); @@ -135,6 +136,7 @@ public class AvroTypeUtil { final String fieldName = field.name(); final Schema fieldSchema = field.schema(); final DataType fieldType = determineDataType(fieldSchema); + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); } @@ -148,9 +150,7 @@ public class AvroTypeUtil { final DataType valueType = determineDataType(valueSchema); return RecordFieldType.MAP.getMapDataType(valueType); case UNION: { - final List nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); + final List nonNullSubSchemas = getNonNullSubSchemas(avroSchema); if (nonNullSubSchemas.size() == 1) { return determineDataType(nonNullSubSchemas.get(0)); @@ -169,11 +169,37 @@ public class AvroTypeUtil { return null; } + private static List getNonNullSubSchemas(Schema avroSchema) { + List unionFieldSchemas = avroSchema.getTypes(); + if (unionFieldSchemas == null) { + return Collections.emptyList(); + } + return unionFieldSchemas.stream() + .filter(s -> s.getType() != Type.NULL) + .collect(Collectors.toList()); + } + public static RecordSchema createSchema(final Schema avroSchema) { if (avroSchema == null) { throw new IllegalArgumentException("Avro Schema cannot be null"); } + return createSchema(avroSchema, avroSchema.toString(), SchemaIdentifier.EMPTY); + } + + /** + * Converts an Avro Schema to a RecordSchema + * + * @param avroSchema the Avro Schema to convert + * @param schemaText the textual representation of the schema + * @param schemaId the identifier of the schema + * @return the Corresponding Record Schema + */ + public static RecordSchema createSchema(final Schema avroSchema, final String schemaText, final SchemaIdentifier schemaId) { + if (avroSchema == null) { + throw new IllegalArgumentException("Avro Schema cannot be null"); + } + final List recordFields = new ArrayList<>(avroSchema.getFields().size()); for (final Field field : avroSchema.getFields()) { final String fieldName = field.name(); @@ -182,7 +208,7 @@ public class AvroTypeUtil { recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); } - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, schemaText, AVRO_SCHEMA_FORMAT, schemaId); return recordSchema; } @@ -227,7 +253,11 @@ public class AvroTypeUtil { return rec; } - private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) throws IOException { + /** + * Convert a raw value to an Avro object to serialize in Avro type system. + * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema)}. + */ + private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) { if (rawValue == null) { return null; } @@ -239,13 +269,13 @@ public class AvroTypeUtil { return DataTypeUtils.toInteger(rawValue, fieldName); } - if (LogicalTypes.date().getName().equals(logicalType.getName())) { + if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) { final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final Date date = new Date(longValue); final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant()); final long days = duration.toDays(); return (int) days; - } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { + } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) { final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final Date date = new Date(longValue); final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); @@ -261,14 +291,14 @@ public class AvroTypeUtil { return DataTypeUtils.toLong(rawValue, fieldName); } - if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { + if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) { final long longValue = DataTypeUtils.toLong(rawValue, fieldName); final Date date = new Date(longValue); final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); return duration.toMillis() * 1000L; - } else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { + } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) { return DataTypeUtils.toLong(rawValue, fieldName); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { + } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) { return DataTypeUtils.toLong(rawValue, fieldName) * 1000L; } @@ -319,28 +349,25 @@ public class AvroTypeUtil { } return avroRecord; case UNION: - List unionFieldSchemas = fieldSchema.getTypes(); - if (unionFieldSchemas != null) { - // Ignore null types in union - final List nonNullFieldSchemas = unionFieldSchemas.stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); + // Ignore null types in union + final List nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); - // If at least one non-null type exists, find the first compatible type - if (nonNullFieldSchemas.size() >= 1) { - for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { - final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName); - final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); - if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType)) { - return avroObject; - } + // If at least one non-null type exists, find the first compatible type + if (nonNullFieldSchemas.size() >= 1) { + for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { + final Object avroObject = convertToAvroObject(rawValue, nonNullFieldSchema, fieldName); + final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); + if (DataTypeUtils.isCompatibleDataType(avroObject, desiredDataType) + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(rawValue, desiredDataType))) { + return avroObject; } - - throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() - + " because no compatible types exist in the UNION"); } + + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + + " because no compatible types exist in the UNION"); } - return null; + return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName)); case ARRAY: final Object[] objectArray = (Object[]) rawValue; final List list = new ArrayList<>(objectArray.length); @@ -399,6 +426,39 @@ public class AvroTypeUtil { return values; } + /** + * Convert value of a nullable union field. + * @param originalValue original value + * @param fieldSchema the union field schema + * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value + * @return a converted value + */ + private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function conversion) { + // Ignore null types in union + final List nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); + + // If at least one non-null type exists, find the first compatible type + if (nonNullFieldSchemas.size() >= 1) { + for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { + final Object convertedValue = conversion.apply(nonNullFieldSchema); + final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); + if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType) + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) { + return convertedValue; + } + } + + throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() + + " because no compatible types exist in the UNION"); + } + return null; + } + + /** + * Convert an Avro object to a normal Java objects for further processing. + * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)} + */ private static Object normalizeValue(final Object value, final Schema avroSchema) { if (value == null) { return null; @@ -412,10 +472,10 @@ public class AvroTypeUtil { } final String logicalName = logicalType.getName(); - if (LogicalTypes.date().getName().equals(logicalName)) { + if (LOGICAL_TYPE_DATE.equals(logicalName)) { // date logical name means that the value is number of days since Jan 1, 1970 return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value)); - } else if (LogicalTypes.timeMillis().equals(logicalName)) { + } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) { // time-millis logical name means that the value is number of milliseconds since midnight. return new java.sql.Time((int) value); } @@ -429,11 +489,11 @@ public class AvroTypeUtil { } final String logicalName = logicalType.getName(); - if (LogicalTypes.timeMicros().getName().equals(logicalName)) { + if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) { return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value)); - } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) { + } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) { return new java.sql.Timestamp((long) value); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) { + } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) { return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value)); } break; @@ -443,7 +503,7 @@ public class AvroTypeUtil { final GenericData.Record avroRecord = (GenericData.Record) value; return normalizeValue(value, avroRecord.getSchema()); } - break; + return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema)); case RECORD: final GenericData.Record record = (GenericData.Record) value; final Schema recordSchema = record.getSchema(); diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml index 6aceaa02ed..54dc0902ba 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml @@ -35,8 +35,8 @@ nifi-record - org.apache.avro - avro + org.apache.nifi + nifi-avro-record-utils org.apache.nifi diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java index f48d0f579c..169d79ddbc 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -17,34 +17,26 @@ package org.apache.nifi.schemaregistry.services; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import org.apache.avro.LogicalType; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; @@ -57,12 +49,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch private final Map schemaNameToSchemaMap; private final ConcurrentMap recordSchemas = new ConcurrentHashMap<>(); - private static final String LOGICAL_TYPE_DATE = "date"; - private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; - private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; - private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; - private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; - public AvroSchemaRegistry() { this.schemaNameToSchemaMap = new HashMap<>(); } @@ -74,7 +60,8 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch } else { try { final Schema avroSchema = new Schema.Parser().parse(newValue); - final RecordSchema recordSchema = createRecordSchema(avroSchema, newValue, descriptor.getName()); + final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName()); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId); recordSchemas.put(descriptor.getName(), recordSchema); } catch (final Exception e) { // not a problem - the service won't be valid and the validation message will indicate what is wrong. @@ -137,113 +124,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch } - /** - * Converts an Avro Schema to a RecordSchema - * - * @param avroSchema the Avro Schema to convert - * @param text the textual representation of the schema - * @param schemaName the name of the schema - * @return the Corresponding Record Schema - */ - private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final String schemaName) { - final List recordFields = new ArrayList<>(avroSchema.getFields().size()); - for (final Field field : avroSchema.getFields()) { - final String fieldName = field.name(); - final DataType dataType = determineDataType(field.schema()); - - recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", SchemaIdentifier.ofName(schemaName)); - return recordSchema; - } - - /** - * Returns a DataType for the given Avro Schema - * - * @param avroSchema the Avro Schema to convert - * @return a Data Type that corresponds to the given Avro Schema - */ - private DataType determineDataType(final Schema avroSchema) { - final Type avroType = avroSchema.getType(); - - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType != null) { - final String logicalTypeName = logicalType.getName(); - switch (logicalTypeName) { - case LOGICAL_TYPE_DATE: - return RecordFieldType.DATE.getDataType(); - case LOGICAL_TYPE_TIME_MILLIS: - case LOGICAL_TYPE_TIME_MICROS: - return RecordFieldType.TIME.getDataType(); - case LOGICAL_TYPE_TIMESTAMP_MILLIS: - case LOGICAL_TYPE_TIMESTAMP_MICROS: - return RecordFieldType.TIMESTAMP.getDataType(); - } - } - - switch (avroType) { - case ARRAY: - return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); - case BYTES: - case FIXED: - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case BOOLEAN: - return RecordFieldType.BOOLEAN.getDataType(); - case DOUBLE: - return RecordFieldType.DOUBLE.getDataType(); - case ENUM: - case STRING: - return RecordFieldType.STRING.getDataType(); - case FLOAT: - return RecordFieldType.FLOAT.getDataType(); - case INT: - return RecordFieldType.INT.getDataType(); - case LONG: - return RecordFieldType.LONG.getDataType(); - case RECORD: { - final List avroFields = avroSchema.getFields(); - final List recordFields = new ArrayList<>(avroFields.size()); - - for (final Field field : avroFields) { - final String fieldName = field.name(); - final Schema fieldSchema = field.schema(); - final DataType fieldType = determineDataType(fieldSchema); - - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY); - return RecordFieldType.RECORD.getRecordDataType(recordSchema); - } - case NULL: - return RecordFieldType.STRING.getDataType(); - case MAP: - final Schema valueSchema = avroSchema.getValueType(); - final DataType valueType = determineDataType(valueSchema); - return RecordFieldType.MAP.getMapDataType(valueType); - case UNION: { - final List nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); - - if (nonNullSubSchemas.size() == 1) { - return determineDataType(nonNullSubSchemas.get(0)); - } - - final List possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); - for (final Schema subSchema : nonNullSubSchemas) { - final DataType childDataType = determineDataType(subSchema); - possibleChildTypes.add(childDataType); - } - - return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); - } - } - - return null; - } - @Override public Set getSuppliedSchemaFields() { return schemaFields; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 1fdcaf7046..5dc116067d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -94,6 +94,7 @@ src/test/resources/avro/datatypes.avsc src/test/resources/avro/logical-types.avsc + src/test/resources/avro/logical-types-nullable.avsc src/test/resources/csv/extra-white-space.csv src/test/resources/csv/multi-bank-account.csv src/test/resources/csv/single-bank-account.csv diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java index d315b2ef14..bbb62c5b03 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java @@ -62,7 +62,16 @@ public class TestAvroReaderWithEmbeddedSchema { @Test public void testLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc")); + testLogicalTypes(schema); + } + @Test + public void testNullableLogicalTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types-nullable.avsc")); + testLogicalTypes(schema); + } + + private void testLogicalTypes(Schema schema) throws ParseException, IOException, MalformedRecordException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final String expectedTime = "2017-04-04 14:20:33.000"; @@ -80,7 +89,7 @@ public class TestAvroReaderWithEmbeddedSchema { final DataFileWriter writer = dataFileWriter.create(schema, baos)) { final GenericRecord record = new GenericData.Record(schema); - record.put("timeMillis", millisSinceMidnight); + record.put("timeMillis", (int) millisSinceMidnight); record.put("timeMicros", millisSinceMidnight * 1000L); record.put("timestampMillis", timeLong); record.put("timestampMicros", timeLong * 1000L); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc new file mode 100644 index 0000000000..c846ee7d0d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc @@ -0,0 +1,69 @@ +{ + "namespace": "nifi", + "name": "data_types", + "type": "record", + "fields": [ + { + "name": "timeMillis", + "type": [ + "null", + { + "type": "int", + "logicalType": "time-millis" + } + ] + }, + { + "name": "timeMicros", + "type": [ + "null", + { + "type": "long", + "logicalType": "time-micros" + } + ] + }, + { + "name": "timestampMillis", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ] + }, + { + "name": "timestampMicros", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-micros" + } + ] + }, + { + "name": "date", + "type": [ + "null", + { + "type": "int", + "logicalType": "date" + } + ] + }, + { + "name": "decimal", + "type": [ + "null", + { + "type": "bytes", + "logicalType": "decimal", + "precision": 5, + "scale": 2 + } + ] + } + ] +} \ No newline at end of file