NIFI-10956: Fix inference issues with mixed arrays (#6763)

This commit is contained in:
Matt Burgess 2022-12-08 09:28:19 -05:00 committed by GitHub
parent 45a31c7286
commit b744fac479
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 303 additions and 23 deletions

View File

@ -1078,7 +1078,7 @@ public class DataTypeUtils {
}
public static boolean isStringTypeCompatible(final Object value) {
return value != null;
return !(value instanceof Record);
}
public static boolean isEnumTypeCompatible(final Object value, final EnumDataType enumType) {
@ -1998,6 +1998,26 @@ public class DataTypeUtils {
final RecordFieldType thisFieldType = thisDataType.getFieldType();
final RecordFieldType otherFieldType = otherDataType.getFieldType();
if (thisFieldType == RecordFieldType.ARRAY && otherFieldType == RecordFieldType.ARRAY) {
// Check for array<null> and return the other (or empty if they are both array<null>). This happens if at some point we inferred an element type of null which
// indicates an empty array, and then we inferred a non-null type for the same field in a different record. The non-null type should be used in that case.
ArrayDataType thisArrayType = (ArrayDataType) thisDataType;
ArrayDataType otherArrayType = (ArrayDataType) otherDataType;
if (thisArrayType.getElementType() == null) {
if (otherArrayType.getElementType() == null) {
return Optional.empty();
} else {
return Optional.of(otherDataType);
}
} else {
if (otherArrayType.getElementType() == null) {
return Optional.of(thisDataType);
} else {
return Optional.empty();
}
}
}
final int thisIntTypeValue = getIntegerTypeValue(thisFieldType);
final int otherIntTypeValue = getIntegerTypeValue(otherFieldType);
final boolean thisIsInt = thisIntTypeValue > -1;

View File

@ -205,6 +205,8 @@
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
<exclude>src/test/resources/json/similar-records.json</exclude>
<exclude>src/test/resources/json/choice-of-array-empty-or-array-record.json</exclude>
<exclude>src/test/resources/json/empty-arrays.json</exclude>
<exclude>src/test/resources/json/choice-of-embedded-similar-records.json</exclude>
<exclude>src/test/resources/json/choice-of-embedded-arrays-and-single-records.json</exclude>
<exclude>src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json</exclude>

View File

@ -96,6 +96,11 @@ public class JsonSchemaInference extends HierarchicalSchemaInference<JsonNode> {
return value.isArray();
}
@Override
protected boolean isEmptyArray(final JsonNode value) {
return value.isArray() && value.size() == 0;
}
@Override
protected void forEachFieldInRecord(final JsonNode rawRecord, final BiConsumer<String, JsonNode> fieldConsumer) {
final Iterator<Map.Entry<String, JsonNode>> itr = rawRecord.fields();

View File

@ -21,6 +21,8 @@ import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import java.io.IOException;
import java.util.ArrayList;
@ -52,7 +54,13 @@ public abstract class HierarchicalSchemaInference<T> implements SchemaInferenceE
}
}
return createSchema(typeMap, rootElementName);
RecordSchema inferredSchema = createSchema(typeMap, rootElementName);
// Replace array<null> with array<string> in the typeMap. We use array<null> internally for empty arrays because for example if we encounter an empty array in the first record,
// we have no way of knowing the type of elements. If we just decide to use STRING as the type like was previously done, this can cause problems because anything can be coerced
// into a STRING, and if we later encounter an array of Records there, we end up inferring that as a STRING so we end up converting the Record objects into STRINGs.
// Instead, we create an Array where the element type is null, then consider ARRAY[x] wider than ARRAY[null] for any x (other than null). But to cover all cases we have to wait
// until the very end, after making inferences based on all data. At that point if the type is still inferred to be null we can just change it to a STRING.
return defaultArrayTypes(inferredSchema);
}
protected void inferSchema(final T rawRecord, final Map<String, FieldTypeInference> inferences) {
@ -78,29 +86,87 @@ public abstract class HierarchicalSchemaInference<T> implements SchemaInferenceE
final DataType fieldDataType = RecordFieldType.RECORD.getRecordDataType(schema);
typeInference.addPossibleDataType(fieldDataType);
} else if (isArray(value)) {
final FieldTypeInference arrayElementTypeInference = new FieldTypeInference();
forEachRawRecordInArray(value, arrayElement -> inferType(arrayElement, arrayElementTypeInference));
if (isEmptyArray(value)) {
// At this point we don't know the type of array elements as the array is empty, and it is too early to assume an array of strings. Use null as the
// element type for now, and call defaultArrayTypes() when all inferences are complete, to ensure that if there are any arrays with inferred element type
// of null, they default to string for the final schema.
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(null);
typeInference.addPossibleDataType(arrayDataType);
} else {
final FieldTypeInference arrayElementTypeInference = new FieldTypeInference();
forEachRawRecordInArray(value, arrayElement -> inferType(arrayElement, arrayElementTypeInference));
final DataType elementDataType = arrayElementTypeInference.toDataType();
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(elementDataType);
typeInference.addPossibleDataType(arrayDataType);
DataType elementDataType = arrayElementTypeInference.toDataType();
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(elementDataType);
typeInference.addPossibleDataType(arrayDataType);
}
} else {
typeInference.addPossibleDataType(getDataType(value));
}
}
/*
* This method checks a RecordSchema's child fields for array<null> datatypes recursively and replaces them with the default array<string>. This should be called
* after all inferences have been completed.
*/
private RecordSchema defaultArrayTypes(final RecordSchema recordSchema) {
List<RecordField> newRecordFields = new ArrayList<>(recordSchema.getFieldCount());
for (RecordField childRecordField : recordSchema.getFields()) {
newRecordFields.add(defaultArrayTypes(childRecordField));
}
return new SimpleRecordSchema(newRecordFields, recordSchema.getIdentifier());
}
/*
* This method checks a RecordField for array<null> datatypes recursively and replaces them with the default array<string>
*/
private RecordField defaultArrayTypes(final RecordField recordField) {
final DataType dataType = recordField.getDataType();
if (dataType.getFieldType() == RecordFieldType.ARRAY) {
if (((ArrayDataType) dataType).getElementType() == null) {
return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
} else {
// Iterate over the array element type (using a synthesized temporary RecordField), defaulting any arrays as well
ArrayDataType arrayDataType = (ArrayDataType) dataType;
RecordField elementRecordField = new RecordField(recordField.getFieldName() + "_element", arrayDataType.getElementType(), recordField.isNullable());
RecordField adjustedElementRecordField = defaultArrayTypes(elementRecordField);
return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(adjustedElementRecordField.getDataType()),
recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
}
}
if (dataType.getFieldType() == RecordFieldType.RECORD) {
RecordDataType recordDataType = (RecordDataType) dataType;
RecordSchema childSchema = recordDataType.getChildSchema();
RecordSchema adjustedRecordSchema = defaultArrayTypes(childSchema);
return new RecordField(recordField.getFieldName(), RecordFieldType.RECORD.getRecordDataType(adjustedRecordSchema), recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable());
}
return recordField;
}
private void inferType(final T value, final FieldTypeInference typeInference) {
if (isObject(value)) {
final RecordSchema schema = createSchema(value);
final DataType fieldDataType = RecordFieldType.RECORD.getRecordDataType(schema);
typeInference.addPossibleDataType(fieldDataType);
} else if (isArray(value)) {
final FieldTypeInference arrayElementTypeInference = new FieldTypeInference();
forEachRawRecordInArray(value, arrayElement -> inferType(arrayElement, arrayElementTypeInference));
if (isEmptyArray(value)) {
// At this point we don't know the type of array elements as the array is empty, and it is too early to assume an array of strings. Use null as the
// element type for now, and call defaultArrayTypes() when all inferences are complete, to ensure that if there are any arrays with inferred element type
// of null, they default to string for the final schema.
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(null);
typeInference.addPossibleDataType(arrayDataType);
} else {
final FieldTypeInference arrayElementTypeInference = new FieldTypeInference();
forEachRawRecordInArray(value, arrayElement -> inferType(arrayElement, arrayElementTypeInference));
final DataType elementDataType = arrayElementTypeInference.toDataType();
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(elementDataType);
typeInference.addPossibleDataType(arrayDataType);
DataType elementDataType = arrayElementTypeInference.toDataType();
final DataType arrayDataType = RecordFieldType.ARRAY.getArrayDataType(elementDataType);
typeInference.addPossibleDataType(arrayDataType);
}
} else {
typeInference.addPossibleDataType(getDataType(value));
}
@ -129,6 +195,8 @@ public abstract class HierarchicalSchemaInference<T> implements SchemaInferenceE
protected abstract boolean isArray(T value);
protected abstract boolean isEmptyArray(T value);
protected abstract void forEachFieldInRecord(T rawRecord, BiConsumer<String, T> fieldConsumer);
protected abstract void forEachRawRecordInArray(T arrayRecord, Consumer<T> rawRecordConsumer);

View File

@ -60,6 +60,11 @@ public class XmlSchemaInference extends HierarchicalSchemaInference<XmlNode> {
return value.getNodeType() == XmlNodeType.ARRAY;
}
@Override
protected boolean isEmptyArray(final XmlNode value) {
return value.getNodeType() == XmlNodeType.ARRAY && ((XmlArrayNode) value).getElements().isEmpty();
}
@Override
protected void forEachFieldInRecord(final XmlNode rawRecord, final BiConsumer<String, XmlNode> fieldConsumer) {
final XmlContainerNode container = (XmlContainerNode) rawRecord;

View File

@ -19,8 +19,12 @@ package org.apache.nifi.json;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@ -41,17 +45,7 @@ class TestJsonSchemaInference {
@Test
void testInferenceIncludesAllRecords() throws IOException {
final File file = new File("src/test/resources/json/data-types.json");
final RecordSchema schema;
try (final InputStream in = new FileInputStream(file);
final InputStream bufferedIn = new BufferedInputStream(in)) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(var, content) -> new JsonRecordSource(content),
new JsonSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
schema = accessStrategy.getSchema(null, bufferedIn, null);
}
final RecordSchema schema = inferSchema(new File("src/test/resources/json/data-types.json"));
assertSame(RecordFieldType.STRING, schema.getDataType("varcharc").get().getFieldType());
assertSame(RecordFieldType.INT, schema.getDataType("uuid").get().getFieldType());
@ -82,4 +76,65 @@ class TestJsonSchemaInference {
assertEquals(Arrays.asList("varcharc", "uuid", "tinyintc", "textc", "datec", "smallintc", "mediumintc", "longintc", "intc", "bigintc",
"floatc", "doublec", "decimalc", "timestampc", "timec", "charc", "tinytextc", "blobc", "mediumtextc", "enumc", "setc", "boolc", "binaryc"), fieldNames);
}
@Test
public void testNestedArrayOfRecords() throws IOException {
final RecordSchema schema = inferSchema(new File("src/test/resources/json/choice-of-array-empty-or-array-record.json"));
final RecordField dataField = schema.getField("data").get();
assertSame(RecordFieldType.RECORD, dataField.getDataType().getFieldType());
final RecordDataType dataFieldType = (RecordDataType) dataField.getDataType();
final RecordSchema dataSchema = dataFieldType.getChildSchema();
final DataType itemsDataType = dataSchema.getDataType("items").get();
assertSame(RecordFieldType.ARRAY, itemsDataType.getFieldType());
final ArrayDataType itemsArrayType = (ArrayDataType) itemsDataType;
final DataType itemsElementType = itemsArrayType.getElementType();
assertEquals(RecordFieldType.RECORD, itemsElementType.getFieldType());
final RecordSchema itemsSchema = ((RecordDataType) itemsElementType).getChildSchema();
final RecordField itemSchedulesField = itemsSchema.getField("itemSchedules").get();
final DataType itemSchedulesDataType = itemSchedulesField.getDataType();
assertEquals(RecordFieldType.ARRAY, itemSchedulesDataType.getFieldType());
final ArrayDataType schedulesArrayType = (ArrayDataType) itemSchedulesDataType;
final DataType schedulesElementType = schedulesArrayType.getElementType();
assertEquals(RecordFieldType.RECORD, schedulesElementType.getFieldType());
}
@Test
public void testEmptyArrays() throws IOException {
final RecordSchema schema = inferSchema(new File("src/test/resources/json/empty-arrays.json"));
final DataType itemsDataType = schema.getDataType("items").get();
assertSame(RecordFieldType.ARRAY, itemsDataType.getFieldType());
final ArrayDataType itemsArrayType = (ArrayDataType) itemsDataType;
final DataType itemsElementType = itemsArrayType.getElementType();
assertEquals(RecordFieldType.RECORD, itemsElementType.getFieldType());
final RecordSchema itemsSchema = ((RecordDataType) itemsElementType).getChildSchema();
final RecordField itemDataField = itemsSchema.getField("itemData").get();
final DataType ItemDataDatatype = itemDataField.getDataType();
assertEquals(RecordFieldType.ARRAY, ItemDataDatatype.getFieldType());
final ArrayDataType itemDataArrayType = (ArrayDataType) ItemDataDatatype;
final DataType itemDataElementType = itemDataArrayType.getElementType();
// Empty arrays should be inferred as array<string>
assertEquals(RecordFieldType.STRING, itemDataElementType.getFieldType());
}
private RecordSchema inferSchema(final File jsonFile) throws IOException {
try (final InputStream in = new FileInputStream(jsonFile);
final InputStream bufferedIn = new BufferedInputStream(in)) {
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(var, content) -> new JsonRecordSource(content),
new JsonSchemaInference(timestampInference), Mockito.mock(ComponentLog.class));
final RecordSchema schema = accessStrategy.getSchema(null, bufferedIn, null);
return schema;
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.NullSuppression;
import org.apache.nifi.schema.access.SchemaNameAsAttribute;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
@ -508,4 +510,46 @@ class TestWriteJsonResult {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
void testChoiceArrayOfStringsOrArrayOfRecords() throws IOException {
final String FILE_LOCATION = "src/test/resources/json/choice-of-array-string-or-array-record.json";
final JsonSchemaInference jsonSchemaInference = new JsonSchemaInference(new TimeValueInference(null, null, null));
final RecordSchema schema = jsonSchemaInference.inferSchema(new JsonRecordSource(new FileInputStream(FILE_LOCATION)));
final Map<String, Object> itemData1 = new HashMap<>();
itemData1.put("itemData", new String[]{"test"});
final Map<String, Object> quantityMap = new HashMap<>();
quantityMap.put("quantity", 10);
final List<RecordField> itemDataRecordFields = new ArrayList<>(1);
itemDataRecordFields.add(new RecordField("quantity", RecordFieldType.INT.getDataType(), true));
final RecordSchema quantityRecordSchema = new SimpleRecordSchema(itemDataRecordFields);
final Record quantityRecord = new MapRecord(quantityRecordSchema, quantityMap);
final Record[] quantityRecordArray = {quantityRecord};
final Map<String, Object> itemData2 = new HashMap<>();
itemData2.put("itemData", quantityRecordArray);
final Object[] itemDataArray = {itemData1, itemData2};
final Map<String, Object> values = new HashMap<>();
values.put("items", itemDataArray);
Record topLevelRecord = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true,
NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(topLevelRecord);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = new String(Files.readAllBytes(Paths.get(FILE_LOCATION)), StandardCharsets.UTF_8);
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
}

View File

@ -0,0 +1,59 @@
{
"data": {
"headerWorklists": [],
"items": [
{
"itemId": "10",
"itemStatus": {},
"itemSchedules": [],
"itemWorklists": [
{
"worklistItem": "364141264",
"reasonCode": "I09",
"description": "Incomplete"
}
]
},
{
"itemId": "20",
"itemStatus": {
"code": "1",
"text": "Planned"
},
"itemSchedules": [
{
"scheduleItem": "10",
"scheduledQuantity": {
"amount": 10,
"uom": "KX6"
},
"scheduledVolume": {
"amount": 10000,
"uom": "BX6"
},
"scheduledVolume2": {
"amount": 1589.873,
"uom": "N15"
},
"scheduledWeight": {
"amount": 1211.642,
"uom": "TNZ"
}
}
],
"itemWorklists": [
{
"worklistItem": "364141264",
"reasonCode": "I09",
"description": "Line item is incomplete"
},
{
"worklistItem": "364141265",
"reasonCode": "Z69",
"description": "Incorrect item"
}
]
}
]
}
}

View File

@ -0,0 +1,9 @@
[ {
"items" : [ {
"itemData" : [ "test" ]
}, {
"itemData" : [ {
"quantity" : 10
} ]
} ]
} ]

View File

@ -0,0 +1,13 @@
{
"id": 6076549,
"items": [
{
"itemData": [],
"test": 1
},
{
"itemData": [],
"test": 2
}
]
}