NIFI-7981 - add support for enum type in avro schema

This closes #4648

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Pierre Villard 2020-11-05 21:10:47 +01:00 committed by Mike Thomsen
parent 9a3a659c44
commit 14ec02f21d
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
19 changed files with 234 additions and 30 deletions

View File

@ -20,6 +20,7 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
@ -98,10 +99,15 @@ public enum RecordFieldType {
*/
CHAR("char"),
/**
* An Enum field type.
*/
ENUM("enum", null, new EnumDataType(null)),
/**
* A String field type. Fields of this type use a {@code java.lang.String} value.
*/
STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP),
STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, ENUM),
/**
* <p>
@ -314,6 +320,14 @@ public enum RecordFieldType {
return new ArrayDataType(elementType, elementsNullable);
}
public DataType getEnumDataType(final List<String> enums) {
if (this != ENUM) {
return null;
}
return new EnumDataType(enums);
}
/**
* Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.type;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordFieldType;
import java.util.List;
import java.util.Objects;
public class EnumDataType extends DataType {
private final List<String> enums;
public EnumDataType(final List<String> enums) {
super(RecordFieldType.ENUM, null);
this.enums = enums;
}
public List<String> getEnums() {
return enums;
}
@Override
public RecordFieldType getFieldType() {
return RecordFieldType.ENUM;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof EnumDataType)) return false;
if (!super.equals(o)) return false;
EnumDataType that = (EnumDataType) o;
return Objects.equals(getEnums(), that.getEnums());
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), getEnums());
}
@Override
public String toString() {
return "ENUM" + getEnums();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.slf4j.Logger;
@ -196,6 +197,8 @@ public class DataTypeUtils {
return toLong(value, fieldName);
case SHORT:
return toShort(value, fieldName);
case ENUM:
return toEnum(value, (EnumDataType) dataType, fieldName);
case STRING:
return toString(value, () -> getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, timestampFormat), charset);
case TIME:
@ -225,7 +228,6 @@ public class DataTypeUtils {
return null;
}
public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
switch (dataType.getFieldType()) {
case ARRAY:
@ -262,6 +264,8 @@ public class DataTypeUtils {
return isTimestampTypeCompatible(value, dataType.getFormat());
case STRING:
return isStringTypeCompatible(value);
case ENUM:
return isEnumTypeCompatible(value, (EnumDataType) dataType);
case MAP:
return isMapTypeCompatible(value);
case CHOICE: {
@ -1025,6 +1029,17 @@ public class DataTypeUtils {
return value != null;
}
public static boolean isEnumTypeCompatible(final Object value, final EnumDataType enumType) {
return enumType.getEnums() != null && enumType.getEnums().contains(value);
}
private static Object toEnum(Object value, EnumDataType dataType, String fieldName) {
if(dataType.getEnums() != null && dataType.getEnums().contains(value)) {
return value.toString();
}
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName);
}
public static java.sql.Date toDate(final Object value, final Supplier<DateFormat> format, final String fieldName) {
if (value == null) {
return null;

View File

@ -365,6 +365,7 @@ public class AvroTypeUtil {
case DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case ENUM:
return RecordFieldType.ENUM.getEnumDataType(avroSchema.getEnumSymbols());
case STRING:
return RecordFieldType.STRING.getDataType();
case FLOAT:
@ -828,7 +829,12 @@ public class AvroTypeUtil {
case NULL:
return null;
case ENUM:
return new GenericData.EnumSymbol(fieldSchema, rawValue);
List<String> enums = fieldSchema.getEnumSymbols();
if(enums != null && enums.contains(rawValue)) {
return new GenericData.EnumSymbol(fieldSchema, rawValue);
} else {
throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
}
case STRING:
return DataTypeUtils.toString(rawValue, (String) null, charset);
}

View File

@ -23,7 +23,6 @@ import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ReportingTaskStatusDTO;
import javax.xml.bind.annotation.XmlRootElement;
/**
* A serialized representation of this class can be placed in the entity body of a response to the API. This particular entity holds a reference to a reporting task.

View File

@ -290,7 +290,8 @@ public class NiFiOrcUtils {
|| RecordFieldType.BOOLEAN.equals(fieldType)
|| RecordFieldType.DOUBLE.equals(fieldType)
|| RecordFieldType.FLOAT.equals(fieldType)
|| RecordFieldType.STRING.equals(fieldType)) {
|| RecordFieldType.STRING.equals(fieldType)
|| RecordFieldType.ENUM.equals(fieldType)) {
return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType);
}
@ -378,7 +379,7 @@ public class NiFiOrcUtils {
if (RecordFieldType.FLOAT.equals(fieldType)) {
return TypeInfoFactory.getPrimitiveTypeInfo("float");
}
if (RecordFieldType.STRING.equals(fieldType)) {
if (RecordFieldType.STRING.equals(fieldType) || RecordFieldType.ENUM.equals(fieldType)) {
return TypeInfoFactory.getPrimitiveTypeInfo("string");
}
@ -422,7 +423,7 @@ public class NiFiOrcUtils {
if (RecordFieldType.DECIMAL.equals(dataType)) {
return "DECIMAL";
}
if (RecordFieldType.STRING.equals(dataType)) {
if (RecordFieldType.STRING.equals(dataType) || RecordFieldType.ENUM.equals(dataType)) {
return "STRING";
}
if (RecordFieldType.DATE.equals(dataType)) {

View File

@ -593,7 +593,9 @@
<exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude>
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person_bad_enum.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person_long_id.json</exclude>
<exclude>src/test/resources/TestValidateRecord/missing-array.json</exclude>
<exclude>src/test/resources/TestValidateRecord/missing-array.avsc</exclude>

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
@ -212,7 +213,7 @@ public class TestConvertRecord {
runner.setProperty(jsonWriter, "compression-format", "snappy");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
@ -309,4 +310,62 @@ public class TestConvertRecord {
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
}
@Test
public void testEnumBadValue() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", avroWriter);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.enableControllerService(avroWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
}
@Test
public void testEnumUnionString() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", avroWriter);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.enableControllerService(avroWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
}
}

View File

@ -3,5 +3,6 @@
"name" : {
"last" : "Doe",
"first" : "John"
}
},
"status" : "ACTIVE"
} ]

View File

@ -0,0 +1,8 @@
[ {
"id" : 485,
"name" : {
"last" : "Doe",
"first" : "John"
},
"status" : "ERROR"
} ]

View File

@ -3,5 +3,6 @@
"name" : {
"last" : "Doe",
"first" : "John"
}
},
"status" : "ACTIVE"
} ]

View File

@ -12,6 +12,18 @@
{ "name": "first", "type": "string" }
]
}
}
},
{
"name": "status",
"type": ["null", {
"type": "enum",
"name": "statusEnum",
"symbols": [
"ACTIVE",
"INACTIVE"
]
}],
"default": null
}
]
}

View File

@ -0,0 +1,34 @@
{
"name": "personWithNameRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": {
"type": "record",
"name": "nameRecord",
"fields": [
{ "name": "last", "type": "string" },
{ "name": "first", "type": "string" }
]
}
},
{
"name": "status",
"type":
[
"null",
{
"type": "enum",
"name": "statusEnum",
"symbols": [
"ACTIVE",
"INACTIVE"
]
},
"string"
],
"default": null
}
]
}

View File

@ -84,22 +84,6 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
return null;
}
private boolean isDateTimeTimestampType(final RecordField field) {
if (field == null) {
return false;
}
final RecordFieldType fieldType = field.getDataType().getFieldType();
switch (fieldType) {
case DATE:
case TIME:
case TIMESTAMP:
return true;
default:
return false;
}
}
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
@ -164,9 +148,11 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
case DOUBLE:
case FLOAT:
case INT:
case BIGINT:
case LONG:
case SHORT:
case STRING:
case ENUM:
case DATE:
case TIME:
case TIMESTAMP: {

View File

@ -134,6 +134,7 @@ public class TestWriteCSVResult {
valueMap.put("record", null);
valueMap.put("choice", 48L);
valueMap.put("array", null);
valueMap.put("enum", null);
final Record record = new MapRecord(schema, valueMap);
final RecordSet rs = RecordSet.of(schema, record);
@ -156,7 +157,7 @@ public class TestWriteCSVResult {
final String values = splits[1];
final StringBuilder expectedBuilder = new StringBuilder();
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",,\"a孟bc李12儒3\",,\"48\",,");
final String expectedValues = expectedBuilder.toString();

View File

@ -101,6 +101,7 @@ public class TestWriteJsonResult {
valueMap.put("timestamp", new Timestamp(time));
valueMap.put("record", null);
valueMap.put("array", null);
valueMap.put("enum", null);
valueMap.put("choice", 48L);
valueMap.put("map", map);

View File

@ -20,7 +20,7 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

View File

@ -195,6 +195,7 @@ public class TestWriteXMLResult {
valueMap.put("timestamp", new Timestamp(time));
valueMap.put("record", null);
valueMap.put("array", null);
valueMap.put("enum", null);
valueMap.put("choice", 48L);
valueMap.put("map", map);
@ -207,7 +208,7 @@ public class TestWriteXMLResult {
writer.write(rs);
writer.flush();
String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><short>8</short>" +
String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><enum /><short>8</short>" +
"<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><decimal>8.1</decimal>" +
"<date>2017-01-01</date><time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
"<map><height>48</height><width>96</width></map></RECORD></ROOT>";

View File

@ -12,6 +12,7 @@
"date" : "2017-01-01",
"time" : "17:00:00",
"char" : "c",
"enum" : null,
"string" : "string",
"record" : null,
"choice" : 48,