NIFI-13380: When determining if Record Type A is 'wider' than Record Type B, and both have a RECORD with the same name but different schemas, instead of determining that A is not wider than B, perform a recursive comparison to check if the RECORD within A's schema is wider than the RECORD within B's schema.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8948
This commit is contained in:
Mark Payne 2024-06-10 15:25:48 -04:00 committed by Matt Burgess
parent c60b0cf4a3
commit a039bc2b76
4 changed files with 186 additions and 4 deletions

View File

@ -125,5 +125,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,7 +17,13 @@
package org.apache.nifi.record.path;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.JsonSchemaInference;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
@ -31,7 +37,10 @@ import org.apache.nifi.uuid5.Uuid5Util;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
@ -241,11 +250,67 @@ public class TestRecordPath {
}
@Test
public void testParent() {
public void testDescendantFieldWithArrayOfRecords() throws IOException, MalformedRecordException {
final String recordJson = """
{
"container" : {
"id" : "0",
"metadata" : {
"filename" : "file1.pdf",
"page.count" : "165"
},
"textElement" : null,
"containers" : [ {
"id" : "1",
"title" : null,
"metadata" : {
"end.page" : 1,
"start.page" : 1
},
"textElement" : {
"text" : "Table of Contents",
"metadata" : { }
},
"containers" : [ ]
} ]
}
}
""";
final JsonSchemaInference schemaInference = new JsonSchemaInference(new TimeValueInference("MM/dd/yyyy", "HH:mm:ss", "MM/dd/yyyy HH:mm:ss"));
final JsonRecordSource jsonRecordSource = new JsonRecordSource(new ByteArrayInputStream(recordJson.getBytes(StandardCharsets.UTF_8)));
final RecordSchema schema = schemaInference.inferSchema(jsonRecordSource);
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(new ByteArrayInputStream(recordJson.getBytes(StandardCharsets.UTF_8)), Mockito.mock(ComponentLog.class),
schema, "MM/dd/yyyy", "HH:mm:ss", "MM/dd/yyyy HH:mm:ss");
final Record record = reader.nextRecord();
final List<FieldValue> fieldValues = RecordPath.compile("//textElement[./text = 'Table of Contents']/metadata/insertion").evaluate(record).getSelectedFields().toList();
assertEquals(1, fieldValues.size());
fieldValues.getFirst().updateValue("Hello");
record.incorporateInactiveFields();
final Record container = (Record) record.getValue("container");
final Object[] containers = (Object[]) container.getValue("containers");
final Record textElement = (Record) (((Record) containers[0]).getValue("textElement"));
final Record metadata = (Record) textElement.getValue("metadata");
assertEquals("Hello", metadata.getValue("insertion"));
final List<RecordField> metadataFields = metadata.getSchema().getFields();
assertEquals(1, metadataFields.size());
assertEquals("insertion", metadataFields.getFirst().getFieldName());
}
private Record createAccountRecord(final int id, final double balance) {
final Map<String, Object> accountValues = new HashMap<>();
accountValues.put("id", 1);
accountValues.put("balance", 123.45D);
final Record accountRecord = new MapRecord(getAccountSchema(), accountValues);
accountValues.put("id", id);
accountValues.put("balance", balance);
return new MapRecord(getAccountSchema(), accountValues);
}
@Test
public void testParent() {
final Record accountRecord = createAccountRecord(1, 123.45D);
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
final Map<String, Object> values = new HashMap<>();
@ -2234,9 +2299,24 @@ public class TestRecordPath {
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountDataType);
final RecordField accountsField = new RecordField("accounts", accountsType);
fields.add(accountsField);
final DataType bankType = RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.STRING.getDataType(),
RecordFieldType.RECORD.getRecordDataType(getBankSchema())
);
final RecordField banksField = new RecordField("banks", RecordFieldType.ARRAY.getArrayDataType(bankType));
fields.add(banksField);
return fields;
}
private RecordSchema getBankSchema() {
final DataType accountDataType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountDataType);
final RecordSchema bankSchema = new SimpleRecordSchema(List.of(new RecordField("accounts", accountsType)));
return bankSchema;
}
private RecordSchema getAccountSchema() {
final List<RecordField> accountFields = new ArrayList<>();
accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

View File

@ -1884,9 +1884,43 @@ public class DataTypeUtils {
return Optional.of(otherRecordDataType);
}
// Check if all fields in 'thisSchema' are equal to or wider than all fields in 'otherSchema'
if (isRecordWider(thisSchema, otherSchema)) {
return Optional.of(thisRecordDataType);
}
if (isRecordWider(otherSchema, thisSchema)) {
return Optional.of(otherRecordDataType);
}
return Optional.empty();
}
public static boolean isRecordWider(final RecordSchema potentiallyWider, final RecordSchema potentiallyNarrower) {
final List<RecordField> narrowerFields = potentiallyNarrower.getFields();
for (final RecordField narrowerField : narrowerFields) {
final Optional<RecordField> widerField = potentiallyWider.getField(narrowerField.getFieldName());
if (widerField.isEmpty()) {
return false;
}
if (widerField.get().getDataType().equals(narrowerField.getDataType())) {
continue;
}
final Optional<DataType> widerType = getWiderType(widerField.get().getDataType(), narrowerField.getDataType());
if (widerType.isEmpty()) {
return false;
}
if (!widerType.get().equals(widerField.get().getDataType())) {
return false;
}
}
return true;
}
private static boolean isDecimalType(final RecordFieldType fieldType) {
return switch (fieldType) {
case FLOAT, DOUBLE, DECIMAL -> true;

View File

@ -111,6 +111,67 @@ public class TestDataTypeUtils {
assertEquals(((RecordDataType) widerType.get()).getChildSchema(), widerRecord.getSchema());
}
@Test
public void testWiderRecordWhenChildRecordHasAllFieldsContainedWithin() {
final Record jane = DataTypeUtils.toRecord(Map.of(
"name", "Jane"
), "");
final Record smallRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"child", jane,
"age", 30), "");
final Record janeWithAge = DataTypeUtils.toRecord(Map.of(
"name", "Jane",
"age", 2
), "");
final Record widerRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"fullName", "John Doe",
"child", janeWithAge,
"age", 30), "");
final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()),
RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema()));
assertTrue(widerType.isPresent());
assertEquals(((RecordDataType) widerType.get()).getChildSchema(), widerRecord.getSchema());
}
@Test
public void testIsRecordWiderWithExtraField() {
final Record jane = DataTypeUtils.toRecord(Map.of(
), "");
final Record smallRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"child", jane,
"age", 30), "");
final Record janeWithAge = DataTypeUtils.toRecord(Map.of(
"name", "Jane",
"age", 2
), "");
final Record widerRecord = DataTypeUtils.toRecord(Map.of(
"firstName", "John",
"lastName", "Doe",
"fullName", "John Doe",
"child", janeWithAge,
"age", 30), "");
assertFalse(DataTypeUtils.isRecordWider(smallRecord.getSchema(), widerRecord.getSchema()));
assertTrue(DataTypeUtils.isRecordWider(widerRecord.getSchema(), smallRecord.getSchema()));
assertFalse(DataTypeUtils.isRecordWider(jane.getSchema(), janeWithAge.getSchema()));
assertTrue(DataTypeUtils.isRecordWider(janeWithAge.getSchema(), jane.getSchema()));
}
@Test
public void testWiderRecordDifferingFields() {
final Record firstRecord = DataTypeUtils.toRecord(Map.of(