From 45e035686f0a51c29629f6f6dfa1d26496e21997 Mon Sep 17 00:00:00 2001 From: Steve Champagne Date: Wed, 7 Jun 2017 12:36:28 +0000 Subject: [PATCH] NIFI-4029: Allow null Avro default values in HortonworksSchemaRegistry This closes #1894. Signed-off-by: Bryan Bende --- .../org/apache/nifi/avro/AvroTypeUtil.java | 6 +- .../nifi-hwx-schema-registry-service/pom.xml | 4 + .../HortonworksSchemaRegistry.java | 122 +----------------- 3 files changed, 13 insertions(+), 119 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 52c55fc8df..1417e67f15 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -152,7 +152,11 @@ public class AvroTypeUtil { final Schema fieldSchema = field.schema(); final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + if (field.defaultVal() == JsonProperties.NULL_VALUE) { + recordFields.add(new RecordField(fieldName, fieldType, field.aliases())); + } else { + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + } } final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index 79dbc84b50..38e175cf38 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -41,6 +41,10 @@ limitations under the License. org.apache.nifi nifi-record + + org.apache.nifi + nifi-avro-record-utils + org.apache.nifi nifi-schema-registry-service-api diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index d2289a2e0e..3027e5fc03 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -25,12 +25,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.avro.LogicalType; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; @@ -42,10 +39,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.Tuple; @@ -261,7 +254,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); }); } @@ -309,120 +302,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); }); } - /** - * Converts an Avro Schema to a RecordSchema - * - * @param avroSchema the Avro Schema to convert - * @param text the textual representation of the schema - * @param schemaId the id of the schema - * @return the Corresponding Record Schema - */ - private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) { - final List recordFields = new ArrayList<>(avroSchema.getFields().size()); - for (final Field field : avroSchema.getFields()) { - final String fieldName = field.name(); - final DataType dataType = determineDataType(field.schema()); - - recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId); - return recordSchema; - } - - /** - * Returns a DataType for the given Avro Schema - * - * @param avroSchema the Avro Schema to convert - * @return a Data Type that corresponds to the given Avro Schema - */ - private DataType determineDataType(final Schema avroSchema) { - final Type avroType = avroSchema.getType(); - - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType != null) { - final String logicalTypeName = logicalType.getName(); - switch (logicalTypeName) { - case LOGICAL_TYPE_DATE: - return RecordFieldType.DATE.getDataType(); - case LOGICAL_TYPE_TIME_MILLIS: - case LOGICAL_TYPE_TIME_MICROS: - return RecordFieldType.TIME.getDataType(); - case LOGICAL_TYPE_TIMESTAMP_MILLIS: - case LOGICAL_TYPE_TIMESTAMP_MICROS: - return RecordFieldType.TIMESTAMP.getDataType(); - } - } - - switch (avroType) { - case ARRAY: - return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); - case BYTES: - case FIXED: - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case BOOLEAN: - return RecordFieldType.BOOLEAN.getDataType(); - case DOUBLE: - return RecordFieldType.DOUBLE.getDataType(); - case ENUM: - case STRING: - return RecordFieldType.STRING.getDataType(); - case FLOAT: - return RecordFieldType.FLOAT.getDataType(); - case INT: - return RecordFieldType.INT.getDataType(); - case LONG: - return RecordFieldType.LONG.getDataType(); - case RECORD: { - final List avroFields = avroSchema.getFields(); - final List recordFields = new ArrayList<>(avroFields.size()); - - for (final Field field : avroFields) { - final String fieldName = field.name(); - final Schema fieldSchema = field.schema(); - final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY); - return RecordFieldType.RECORD.getRecordDataType(recordSchema); - } - case NULL: - return RecordFieldType.STRING.getDataType(); - case MAP: - final Schema valueSchema = avroSchema.getValueType(); - final DataType valueType = determineDataType(valueSchema); - return RecordFieldType.MAP.getMapDataType(valueType); - case UNION: { - final List nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); - - if (nonNullSubSchemas.size() == 1) { - return determineDataType(nonNullSubSchemas.get(0)); - } - - final List possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); - for (final Schema subSchema : nonNullSubSchemas) { - final DataType childDataType = determineDataType(subSchema); - possibleChildTypes.add(childDataType); - } - - return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); - } - } - - return null; - } - - @Override public Set getSuppliedSchemaFields() { return schemaFields; } -} \ No newline at end of file +}