NIFI-5138: Bug fix to ensure that when we have a CHOICE between two or more REOCRD types that we choose the appropriate RECORD type when creating the Record in the JSON Reader.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2670.
This commit is contained in:
Mark Payne 2018-05-02 09:13:52 -04:00 committed by Pierre Villard
parent f742a3a6ac
commit 4700b8653d
8 changed files with 154 additions and 9 deletions

View File

@ -78,7 +78,7 @@ public class StandardFieldValue implements FieldValue {
return Arrays.toString((Object[]) value); return Arrays.toString((Object[]) value);
} }
return value.toString(); return String.valueOf(value);
} }
protected static FieldValue validateParentRecord(final FieldValue parent) { protected static FieldValue validateParentRecord(final FieldValue parent) {

View File

@ -52,8 +52,11 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.type.RecordDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataTypeUtils { public class DataTypeUtils {
private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class);
// Regexes for parsing Floating-Point numbers // Regexes for parsing Floating-Point numbers
private static final String OptionalSign = "[\\-\\+]?"; private static final String OptionalSign = "[\\-\\+]?";
@ -192,8 +195,37 @@ public class DataTypeUtils {
return isIntegerTypeCompatible(value); return isIntegerTypeCompatible(value);
case LONG: case LONG:
return isLongTypeCompatible(value); return isLongTypeCompatible(value);
case RECORD: case RECORD: {
return isRecordTypeCompatible(value); if (value == null) {
return false;
}
if (!(value instanceof Record)) {
return false;
}
final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
if (schema == null) {
return true;
}
final Record record = (Record) value;
for (final RecordField childField : schema.getFields()) {
final Object childValue = record.getValue(childField);
if (childValue == null && !childField.isNullable()) {
logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
return false;
}
if (childValue == null) {
continue; // consider compatible
}
if (!isCompatibleDataType(childValue, childField.getDataType())) {
return false;
}
}
return true;
}
case SHORT: case SHORT:
return isShortTypeCompatible(value); return isShortTypeCompatible(value);
case TIME: case TIME:

View File

@ -126,6 +126,8 @@
<exclude>src/test/resources/json/single-element-nested-array.json</exclude> <exclude>src/test/resources/json/single-element-nested-array.json</exclude>
<exclude>src/test/resources/json/single-element-nested.json</exclude> <exclude>src/test/resources/json/single-element-nested.json</exclude>
<exclude>src/test/resources/json/output/dataTypes.json</exclude> <exclude>src/test/resources/json/output/dataTypes.json</exclude>
<exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
<exclude>src/test/resources/json/record-choice.avsc</exclude>
<exclude>src/test/resources/xml/people.xml</exclude> <exclude>src/test/resources/xml/people.xml</exclude>
<exclude>src/test/resources/xml/people2.xml</exclude> <exclude>src/test/resources/xml/people2.xml</exclude>
<exclude>src/test/resources/xml/people3.xml</exclude> <exclude>src/test/resources/xml/people3.xml</exclude>

View File

@ -35,7 +35,9 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParseException;
@ -134,7 +136,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final ArrayDataType arrayDataType = (ArrayDataType) dataType; final ArrayDataType arrayDataType = (ArrayDataType) dataType;
elementDataType = arrayDataType.getElementType(); elementDataType = arrayDataType.getElementType();
} else { } else {
elementDataType = null; elementDataType = dataType;
} }
for (final JsonNode node : arrayNode) { for (final JsonNode node : arrayNode) {
@ -146,12 +148,34 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
} }
if (fieldNode.isObject()) { if (fieldNode.isObject()) {
RecordSchema childSchema; RecordSchema childSchema = null;
if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
final RecordDataType recordDataType = (RecordDataType) dataType; final RecordDataType recordDataType = (RecordDataType) dataType;
childSchema = recordDataType.getChildSchema(); childSchema = recordDataType.getChildSchema();
} else { } else if (dataType != null && RecordFieldType.CHOICE == dataType.getFieldType()) {
childSchema = null; final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
for (final DataType possibleDataType : choiceDataType.getPossibleSubTypes()) {
if (possibleDataType.getFieldType() != RecordFieldType.RECORD) {
continue;
}
final RecordSchema possibleSchema = ((RecordDataType) possibleDataType).getChildSchema();
final Map<String, Object> childValues = new HashMap<>();
final Iterator<String> fieldNames = fieldNode.getFieldNames();
while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next();
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), possibleSchema.getDataType(childFieldName).orElse(null));
childValues.put(childFieldName, childValue);
}
final Record possibleRecord = new MapRecord(possibleSchema, childValues);
if (DataTypeUtils.isCompatibleDataType(possibleRecord, possibleDataType)) {
return possibleRecord;
}
}
} }
if (childSchema == null) { if (childSchema == null) {
@ -162,7 +186,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final Map<String, Object> childValues = new HashMap<>(); final Map<String, Object> childValues = new HashMap<>();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next(); final String childFieldName = fieldNames.next();
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
final DataType childDataType = childSchema.getDataType(childFieldName).orElse(null);
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), childDataType);
childValues.put(childFieldName, childValue); childValues.put(childFieldName, childValue);
} }

View File

@ -222,7 +222,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
} }
} }
case CHOICE: { case CHOICE: {
return DataTypeUtils.convertType(getRawNodeValue(fieldNode), desiredType, fieldName); return DataTypeUtils.convertType(getRawNodeValue(fieldNode, desiredType), desiredType, fieldName);
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.nifi.json; package org.apache.nifi.json;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -36,6 +37,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
@ -135,6 +138,62 @@ public class TestJsonTreeRowRecordReader {
} }
} }
@Test
public void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc"));
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
// evaluate first record
final Record firstRecord = reader.nextRecord();
assertNotNull(firstRecord);
final RecordSchema firstOuterSchema = firstRecord.getSchema();
assertEquals(Arrays.asList("id", "child"), firstOuterSchema.getFieldNames());
assertEquals("1234", firstRecord.getValue("id"));
// record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
assertEquals(2, firstSubTypes.size());
assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
// child record should have a schema with "id" as the only field
final Object childObject = firstRecord.getValue("child");
assertTrue(childObject instanceof Record);
final Record firstChildRecord = (Record) childObject;
final RecordSchema firstChildSchema = firstChildRecord.getSchema();
assertEquals(Arrays.asList("id"), firstChildSchema.getFieldNames());
// evaluate second record
final Record secondRecord = reader.nextRecord();
assertNotNull(secondRecord);
final RecordSchema secondOuterSchema = secondRecord.getSchema();
assertEquals(Arrays.asList("id", "child"), secondOuterSchema.getFieldNames());
assertEquals("1234", secondRecord.getValue("id"));
// record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
assertEquals(2, secondSubTypes.size());
assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
// child record should have a schema with "name" as the only field
final Object secondChildObject = secondRecord.getValue("child");
assertTrue(secondChildObject instanceof Record);
final Record secondChildRecord = (Record) secondChildObject;
final RecordSchema secondChildSchema = secondChildRecord.getSchema();
assertEquals(Arrays.asList("name"), secondChildSchema.getFieldNames());
assertNull(reader.nextRecord());
}
}
@Test @Test
public void testReadArray() throws IOException, MalformedRecordException { public void testReadArray() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());

View File

@ -0,0 +1,11 @@
[{
"id": "1234",
"child": {
"id": "4321"
}
}, {
"id": "1234",
"child": {
"name": "child"
}
}]

View File

@ -0,0 +1,15 @@
{
"name": "top", "namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "child", "type": [{
"name": "first", "type": "record",
"fields": [{ "name": "name", "type": "string" }]
}, {
"name": "second", "type": "record",
"fields": [{ "name": "id", "type": "string" }]
}]
}
]
}