diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java index 589708622c..7526c0c566 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java @@ -78,7 +78,7 @@ public class StandardFieldValue implements FieldValue { return Arrays.toString((Object[]) value); } - return value.toString(); + return String.valueOf(value); } protected static FieldValue validateParentRecord(final FieldValue parent) { diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 477b02a54b..d15f379e27 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -52,8 +52,11 @@ import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.RecordDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DataTypeUtils { + private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class); // Regexes for parsing Floating-Point numbers private static final String OptionalSign = "[\\-\\+]?"; @@ -192,8 +195,37 @@ public class DataTypeUtils { return isIntegerTypeCompatible(value); case LONG: return isLongTypeCompatible(value); - case RECORD: - return isRecordTypeCompatible(value); + case RECORD: { + if (value == null) { + return false; + } + if (!(value instanceof Record)) { + return false; + } + + final RecordSchema schema = ((RecordDataType) dataType).getChildSchema(); + if (schema == null) { + return true; + } + + final Record record = (Record) value; + for (final RecordField childField : schema.getFields()) { + final Object childValue = record.getValue(childField); + if (childValue == null && !childField.isNullable()) { + logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); + return false; + } + if (childValue == null) { + continue; // consider compatible + } + + if (!isCompatibleDataType(childValue, childField.getDataType())) { + return false; + } + } + + return true; + } case SHORT: return isShortTypeCompatible(value); case TIME: diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 0c54b7c1b1..c57e07cde7 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -126,6 +126,8 @@ src/test/resources/json/single-element-nested-array.json src/test/resources/json/single-element-nested.json src/test/resources/json/output/dataTypes.json + src/test/resources/json/elements-for-record-choice.json + src/test/resources/json/record-choice.avsc src/test/resources/xml/people.xml src/test/resources/xml/people2.xml src/test/resources/xml/people3.xml diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index cc08d346f0..4f9a791334 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -35,7 +35,9 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; @@ -134,7 +136,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final ArrayDataType arrayDataType = (ArrayDataType) dataType; elementDataType = arrayDataType.getElementType(); } else { - elementDataType = null; + elementDataType = dataType; } for (final JsonNode node : arrayNode) { @@ -146,12 +148,34 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } if (fieldNode.isObject()) { - RecordSchema childSchema; + RecordSchema childSchema = null; if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { final RecordDataType recordDataType = (RecordDataType) dataType; childSchema = recordDataType.getChildSchema(); - } else { - childSchema = null; + } else if (dataType != null && RecordFieldType.CHOICE == dataType.getFieldType()) { + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + + for (final DataType possibleDataType : choiceDataType.getPossibleSubTypes()) { + if (possibleDataType.getFieldType() != RecordFieldType.RECORD) { + continue; + } + + final RecordSchema possibleSchema = ((RecordDataType) possibleDataType).getChildSchema(); + + final Map childValues = new HashMap<>(); + final Iterator fieldNames = fieldNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String childFieldName = fieldNames.next(); + + final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), possibleSchema.getDataType(childFieldName).orElse(null)); + childValues.put(childFieldName, childValue); + } + + final Record possibleRecord = new MapRecord(possibleSchema, childValues); + if (DataTypeUtils.isCompatibleDataType(possibleRecord, possibleDataType)) { + return possibleRecord; + } + } } if (childSchema == null) { @@ -162,7 +186,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final Map childValues = new HashMap<>(); while (fieldNames.hasNext()) { final String childFieldName = fieldNames.next(); - final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType); + + final DataType childDataType = childSchema.getDataType(childFieldName).orElse(null); + final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), childDataType); childValues.put(childFieldName, childValue); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 9e2c965a93..e53fcc01bd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -222,7 +222,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { } } case CHOICE: { - return DataTypeUtils.convertType(getRawNodeValue(fieldNode), desiredType, fieldName); + return DataTypeUtils.convertType(getRawNodeValue(fieldNode, desiredType), desiredType, fieldName); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 73abdff418..d71fd32fc0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -18,6 +18,7 @@ package org.apache.nifi.json; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -36,6 +37,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -135,6 +138,62 @@ public class TestJsonTreeRowRecordReader { } } + @Test + public void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { + final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc")); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) { + + // evaluate first record + final Record firstRecord = reader.nextRecord(); + assertNotNull(firstRecord); + final RecordSchema firstOuterSchema = firstRecord.getSchema(); + assertEquals(Arrays.asList("id", "child"), firstOuterSchema.getFieldNames()); + assertEquals("1234", firstRecord.getValue("id")); + + // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types + assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE); + final List firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes(); + assertEquals(2, firstSubTypes.size()); + assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); + + // child record should have a schema with "id" as the only field + final Object childObject = firstRecord.getValue("child"); + assertTrue(childObject instanceof Record); + final Record firstChildRecord = (Record) childObject; + final RecordSchema firstChildSchema = firstChildRecord.getSchema(); + + assertEquals(Arrays.asList("id"), firstChildSchema.getFieldNames()); + + // evaluate second record + final Record secondRecord = reader.nextRecord(); + assertNotNull(secondRecord); + + final RecordSchema secondOuterSchema = secondRecord.getSchema(); + assertEquals(Arrays.asList("id", "child"), secondOuterSchema.getFieldNames()); + assertEquals("1234", secondRecord.getValue("id")); + + // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types + assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE); + final List secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes(); + assertEquals(2, secondSubTypes.size()); + assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); + + // child record should have a schema with "name" as the only field + final Object secondChildObject = secondRecord.getValue("child"); + assertTrue(secondChildObject instanceof Record); + final Record secondChildRecord = (Record) secondChildObject; + final RecordSchema secondChildSchema = secondChildRecord.getSchema(); + + assertEquals(Arrays.asList("name"), secondChildSchema.getFieldNames()); + + assertNull(reader.nextRecord()); + } + + } + @Test public void testReadArray() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json new file mode 100644 index 0000000000..f15452867a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/elements-for-record-choice.json @@ -0,0 +1,11 @@ +[{ + "id": "1234", + "child": { + "id": "4321" + } +}, { + "id": "1234", + "child": { + "name": "child" + } +}] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc new file mode 100644 index 0000000000..51ea31efb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/record-choice.avsc @@ -0,0 +1,15 @@ +{ + "name": "top", "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "child", "type": [{ + "name": "first", "type": "record", + "fields": [{ "name": "name", "type": "string" }] + }, { + "name": "second", "type": "record", + "fields": [{ "name": "id", "type": "string" }] + }] + } + ] +} \ No newline at end of file