diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 52c55fc8df..1417e67f15 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -152,7 +152,11 @@ public class AvroTypeUtil {
final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema);
- recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
+ if (field.defaultVal() == JsonProperties.NULL_VALUE) {
+ recordFields.add(new RecordField(fieldName, fieldType, field.aliases()));
+ } else {
+ recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
+ }
}
final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index 79dbc84b50..38e175cf38 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-avro-record-utils
+
org.apache.nifi
nifi-schema-registry-service-api
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index d2289a2e0e..3027e5fc03 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -25,12 +25,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
+import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -42,10 +39,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.Tuple;
@@ -261,7 +254,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
- return createRecordSchema(schema, schemaText, schemaIdentifier);
+ return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
});
}
@@ -309,120 +302,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText);
return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
final Schema schema = new Schema.Parser().parse(schemaText);
- return createRecordSchema(schema, schemaText, schemaIdentifier);
+ return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier);
});
}
- /**
- * Converts an Avro Schema to a RecordSchema
- *
- * @param avroSchema the Avro Schema to convert
- * @param text the textual representation of the schema
- * @param schemaId the id of the schema
- * @return the Corresponding Record Schema
- */
- private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) {
- final List recordFields = new ArrayList<>(avroSchema.getFields().size());
- for (final Field field : avroSchema.getFields()) {
- final String fieldName = field.name();
- final DataType dataType = determineDataType(field.schema());
-
- recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
- }
-
- final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId);
- return recordSchema;
- }
-
- /**
- * Returns a DataType for the given Avro Schema
- *
- * @param avroSchema the Avro Schema to convert
- * @return a Data Type that corresponds to the given Avro Schema
- */
- private DataType determineDataType(final Schema avroSchema) {
- final Type avroType = avroSchema.getType();
-
- final LogicalType logicalType = avroSchema.getLogicalType();
- if (logicalType != null) {
- final String logicalTypeName = logicalType.getName();
- switch (logicalTypeName) {
- case LOGICAL_TYPE_DATE:
- return RecordFieldType.DATE.getDataType();
- case LOGICAL_TYPE_TIME_MILLIS:
- case LOGICAL_TYPE_TIME_MICROS:
- return RecordFieldType.TIME.getDataType();
- case LOGICAL_TYPE_TIMESTAMP_MILLIS:
- case LOGICAL_TYPE_TIMESTAMP_MICROS:
- return RecordFieldType.TIMESTAMP.getDataType();
- }
- }
-
- switch (avroType) {
- case ARRAY:
- return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
- case BYTES:
- case FIXED:
- return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
- case BOOLEAN:
- return RecordFieldType.BOOLEAN.getDataType();
- case DOUBLE:
- return RecordFieldType.DOUBLE.getDataType();
- case ENUM:
- case STRING:
- return RecordFieldType.STRING.getDataType();
- case FLOAT:
- return RecordFieldType.FLOAT.getDataType();
- case INT:
- return RecordFieldType.INT.getDataType();
- case LONG:
- return RecordFieldType.LONG.getDataType();
- case RECORD: {
- final List avroFields = avroSchema.getFields();
- final List recordFields = new ArrayList<>(avroFields.size());
-
- for (final Field field : avroFields) {
- final String fieldName = field.name();
- final Schema fieldSchema = field.schema();
- final DataType fieldType = determineDataType(fieldSchema);
- recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases()));
- }
-
- final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY);
- return RecordFieldType.RECORD.getRecordDataType(recordSchema);
- }
- case NULL:
- return RecordFieldType.STRING.getDataType();
- case MAP:
- final Schema valueSchema = avroSchema.getValueType();
- final DataType valueType = determineDataType(valueSchema);
- return RecordFieldType.MAP.getMapDataType(valueType);
- case UNION: {
- final List nonNullSubSchemas = avroSchema.getTypes().stream()
- .filter(s -> s.getType() != Type.NULL)
- .collect(Collectors.toList());
-
- if (nonNullSubSchemas.size() == 1) {
- return determineDataType(nonNullSubSchemas.get(0));
- }
-
- final List possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
- for (final Schema subSchema : nonNullSubSchemas) {
- final DataType childDataType = determineDataType(subSchema);
- possibleChildTypes.add(childDataType);
- }
-
- return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
- }
- }
-
- return null;
- }
-
-
@Override
public Set getSuppliedSchemaFields() {
return schemaFields;
}
-}
\ No newline at end of file
+}