NIFI-11621: Handle the case of CHOICE fields when inferring the type of ARRAY elements. E.g., support ARRAY<CHOICE<STRING, NULL>>

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Mark Payne 2023-05-31 14:54:24 -04:00 committed by Matt Burgess
parent 0344bd3e25
commit 649494f7c1
5 changed files with 107 additions and 10 deletions

View File

@ -229,6 +229,8 @@
<exclude>src/test/resources/json/docs-example.json</exclude> <exclude>src/test/resources/json/docs-example.json</exclude>
<exclude>src/test/resources/json/choice-of-string-or-array-record.json</exclude> <exclude>src/test/resources/json/choice-of-string-or-array-record.json</exclude>
<exclude>src/test/resources/json/choice-of-string-or-array-record.avsc</exclude> <exclude>src/test/resources/json/choice-of-string-or-array-record.avsc</exclude>
<exclude>src/test/resources/json/nested-choice-of-empty-array-or-string.json</exclude>
<exclude>src/test/resources/json/nested-choice-of-record-array-or-string.json</exclude>
<exclude>src/test/resources/syslog/syslog5424/log.txt</exclude> <exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude> <exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude> <exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>

View File

@ -22,13 +22,16 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.type.RecordDataType;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -122,26 +125,52 @@ public abstract class HierarchicalSchemaInference<T> implements SchemaInferenceE
*/ */
private RecordField defaultArrayTypes(final RecordField recordField) { private RecordField defaultArrayTypes(final RecordField recordField) {
final DataType dataType = recordField.getDataType(); final DataType dataType = recordField.getDataType();
if (dataType.getFieldType() == RecordFieldType.ARRAY) { final RecordFieldType fieldType = dataType.getFieldType();
if (((ArrayDataType) dataType).getElementType() == null) { if (fieldType == RecordFieldType.ARRAY) {
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
if (arrayDataType.getElementType() == null) {
return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable()); recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
} else { } else {
// Iterate over the array element type (using a synthesized temporary RecordField), defaulting any arrays as well // Iterate over the array element type (using a synthesized temporary RecordField), defaulting any arrays as well
ArrayDataType arrayDataType = (ArrayDataType) dataType; final RecordField elementRecordField = new RecordField(recordField.getFieldName() + "_element", arrayDataType.getElementType(), recordField.isNullable());
RecordField elementRecordField = new RecordField(recordField.getFieldName() + "_element", arrayDataType.getElementType(), recordField.isNullable()); final RecordField adjustedElementRecordField = defaultArrayTypes(elementRecordField);
RecordField adjustedElementRecordField = defaultArrayTypes(elementRecordField);
return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(adjustedElementRecordField.getDataType()), return new RecordField(recordField.getFieldName(), RecordFieldType.ARRAY.getArrayDataType(adjustedElementRecordField.getDataType()),
recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable()); recordField.getDefaultValue(), recordField.getAliases(), recordField.isNullable());
} }
} } else if (fieldType == RecordFieldType.RECORD) {
if (dataType.getFieldType() == RecordFieldType.RECORD) { final RecordDataType recordDataType = (RecordDataType) dataType;
RecordDataType recordDataType = (RecordDataType) dataType; final RecordSchema childSchema = recordDataType.getChildSchema();
RecordSchema childSchema = recordDataType.getChildSchema(); final RecordSchema adjustedRecordSchema = defaultArrayTypes(childSchema);
RecordSchema adjustedRecordSchema = defaultArrayTypes(childSchema);
return new RecordField(recordField.getFieldName(), RecordFieldType.RECORD.getRecordDataType(adjustedRecordSchema), recordField.getDefaultValue(), return new RecordField(recordField.getFieldName(), RecordFieldType.RECORD.getRecordDataType(adjustedRecordSchema), recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable()); recordField.getAliases(), recordField.isNullable());
} else if (fieldType == RecordFieldType.CHOICE) {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> choices = choiceDataType.getPossibleSubTypes();
// Use a LinkedHashSet to preserve ordering while at the same time ensuring that we don't add duplicates,
// as resolving null values could cause a duplicate (e.g., if there's a STRING and a NULL, that may become a choice of two STRINGs).
final Set<DataType> defaulted = new LinkedHashSet<>(choices.size());
for (final DataType choice : choices) {
final RecordField choiceRecordField = new RecordField(recordField.getFieldName() + "_choice", choice, recordField.isNullable());
final RecordField defaultedRecordField = defaultArrayTypes(choiceRecordField);
defaulted.add(defaultedRecordField.getDataType());
}
// If there's only 1 possible sub-type, don't use a CHOICE. Instead, just use that type.
if (defaulted.size() == 1) {
return new RecordField(recordField.getFieldName(), defaulted.iterator().next(), recordField.getDefaultValue(), recordField.getAliases(),
recordField.isNullable());
}
// Create a CHOICE for all of the possible types
final List<DataType> defaultedTypeList = new ArrayList<>(defaulted);
return new RecordField(recordField.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(defaultedTypeList), recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable());
} }
return recordField; return recordField;

View File

@ -24,6 +24,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -37,7 +38,10 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestJsonSchemaInference { class TestJsonSchemaInference {
@ -124,6 +128,64 @@ class TestJsonSchemaInference {
assertEquals(RecordFieldType.STRING, itemDataElementType.getFieldType()); assertEquals(RecordFieldType.STRING, itemDataElementType.getFieldType());
} }
@Test
public void testNestedChoiceOfArrayTypes() throws IOException {
final RecordSchema schema = inferSchema(new File("src/test/resources/json/nested-choice-of-record-array-or-string.json"));
final DataType testRecordDataType = schema.getDataType("test_record").get();
assertSame(RecordFieldType.RECORD, testRecordDataType.getFieldType());
final RecordDataType recordDataType = (RecordDataType) testRecordDataType;
final DataType childDataType = recordDataType.getChildSchema().getDataType("array_test_record").get();
assertSame(RecordFieldType.CHOICE, childDataType.getFieldType());
final ChoiceDataType childChoiceDataType = (ChoiceDataType) childDataType;
final List<DataType> childChoices = childChoiceDataType.getPossibleSubTypes();
assertEquals(2, childChoices.size());
final DataType firstChoice = childChoices.get(0);
assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
final DataType secondChoice = childChoices.get(1);
assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
final RecordSchema firstChildSchema = ((RecordDataType) firstChoice).getChildSchema();
final DataType firstArrayType = firstChildSchema.getDataType("test_array").get();
assertSame(RecordFieldType.ARRAY, firstArrayType.getFieldType());
final DataType firstArrayElementType = ((ArrayDataType) firstArrayType).getElementType();
assertNotNull(firstArrayElementType);
final RecordFieldType firstArrayFieldType = firstArrayElementType.getFieldType();
final RecordSchema secondChildSchema = ((RecordDataType) secondChoice).getChildSchema();
final DataType secondArrayType = secondChildSchema.getDataType("test_array").get();
assertSame(RecordFieldType.ARRAY, secondArrayType.getFieldType());
final DataType secondArrayElementType = ((ArrayDataType) secondArrayType).getElementType();
assertNotNull(secondArrayElementType);
final RecordFieldType secondArrayFieldType = secondArrayElementType.getFieldType();
// Ensure that one of the arrays is a STRING and the other is a RECORD.
assertTrue(firstArrayFieldType == RecordFieldType.STRING || secondArrayFieldType == RecordFieldType.STRING);
assertTrue(firstArrayFieldType == RecordFieldType.RECORD || secondArrayFieldType == RecordFieldType.RECORD);
assertNotEquals(firstArrayElementType, secondArrayElementType);
}
@Test
public void testNestedChoiceOfEmptyOrStringArray() throws IOException {
final RecordSchema schema = inferSchema(new File("src/test/resources/json/nested-choice-of-empty-array-or-string.json"));
final DataType testRecordDataType = schema.getDataType("test_record").get();
assertSame(RecordFieldType.RECORD, testRecordDataType.getFieldType());
final RecordDataType recordDataType = (RecordDataType) testRecordDataType;
final DataType childDataType = recordDataType.getChildSchema().getDataType("array_test_record").get();
assertSame(RecordFieldType.RECORD, childDataType.getFieldType());
final RecordSchema childSchema = ((RecordDataType) childDataType).getChildSchema();
final DataType arrayDataType = childSchema.getDataType("test_array").get();
assertSame(RecordFieldType.ARRAY, arrayDataType.getFieldType());
final DataType arrayElementType = ((ArrayDataType) arrayDataType).getElementType();
assertSame(RecordFieldType.STRING, arrayElementType.getFieldType());
}
private RecordSchema inferSchema(final File jsonFile) throws IOException { private RecordSchema inferSchema(final File jsonFile) throws IOException {
try (final InputStream in = new FileInputStream(jsonFile); try (final InputStream in = new FileInputStream(jsonFile);
final InputStream bufferedIn = new BufferedInputStream(in)) { final InputStream bufferedIn = new BufferedInputStream(in)) {

View File

@ -0,0 +1,2 @@
[{"test_record":{"array_test_record":{"test_array":[]}}},
{"test_record":{"array_test_record":{"test_array":["test"]}}}]

View File

@ -0,0 +1,2 @@
[{"test_record":{"array_test_record":{"test_array":[ {"greeting": "hello"} ]}}},
{"test_record":{"array_test_record":{"test_array":["test"]}}}]