mirror of https://github.com/apache/nifi.git
NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly: change the logic that selects the first compatible schema which can have missing fields compared to the real value and search for a more strict match first and fallback to the existing logic only if not one found.
- AbstractJsonRowRecordReader - Handle (meaning log a warning and not fail completely) multi-array CHOICE type when data has extra fields (not defined by the schema) and can't determine correct type. - AvroTypeUtil - Allow multiple different record types in avro union type. Minor refactors. Added documentation fro EqualsWrapper.
This commit is contained in:
parent
77c353219b
commit
a50957161c
|
@ -234,9 +234,13 @@ public class DataTypeUtils {
|
|||
}
|
||||
|
||||
public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
|
||||
return isCompatibleDataType(value, dataType, false);
|
||||
}
|
||||
|
||||
public static boolean isCompatibleDataType(final Object value, final DataType dataType, final boolean strict) {
|
||||
switch (dataType.getFieldType()) {
|
||||
case ARRAY:
|
||||
return isArrayTypeCompatible(value, ((ArrayDataType) dataType).getElementType());
|
||||
return isArrayTypeCompatible(value, ((ArrayDataType) dataType).getElementType(), strict);
|
||||
case BIGINT:
|
||||
return isBigIntTypeCompatible(value);
|
||||
case BOOLEAN:
|
||||
|
@ -259,7 +263,7 @@ public class DataTypeUtils {
|
|||
return isLongTypeCompatible(value);
|
||||
case RECORD: {
|
||||
final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
|
||||
return isRecordTypeCompatible(schema, value);
|
||||
return isRecordTypeCompatible(schema, value, strict);
|
||||
}
|
||||
case SHORT:
|
||||
return isShortTypeCompatible(value);
|
||||
|
@ -629,9 +633,10 @@ public class DataTypeUtils {
|
|||
* Check if the given record structured object compatible with the schema.
|
||||
* @param schema record schema, schema validation will not be performed if schema is null
|
||||
* @param value the record structured object, i.e. Record or Map
|
||||
* @param strict check for a strict match, i.e. all fields in the record should have a corresponding entry in the schema
|
||||
* @return True if the object is compatible with the schema
|
||||
*/
|
||||
private static boolean isRecordTypeCompatible(RecordSchema schema, Object value) {
|
||||
private static boolean isRecordTypeCompatible(RecordSchema schema, Object value, boolean strict) {
|
||||
|
||||
if (value == null) {
|
||||
return false;
|
||||
|
@ -645,6 +650,14 @@ public class DataTypeUtils {
|
|||
return true;
|
||||
}
|
||||
|
||||
if (strict) {
|
||||
if (value instanceof Record) {
|
||||
if (!schema.getFieldNames().containsAll(((Record)value).getRawFieldNames())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (final RecordField childField : schema.getFields()) {
|
||||
final Object childValue;
|
||||
if (value instanceof Record) {
|
||||
|
@ -661,7 +674,7 @@ public class DataTypeUtils {
|
|||
continue; // consider compatible
|
||||
}
|
||||
|
||||
if (!isCompatibleDataType(childValue, childField.getDataType())) {
|
||||
if (!isCompatibleDataType(childValue, childField.getDataType(), strict)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -729,6 +742,10 @@ public class DataTypeUtils {
|
|||
}
|
||||
|
||||
public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) {
|
||||
return isArrayTypeCompatible(value, elementDataType, false);
|
||||
}
|
||||
|
||||
public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType, final boolean strict) {
|
||||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -736,7 +753,7 @@ public class DataTypeUtils {
|
|||
if (value instanceof Object[]) {
|
||||
for (Object o : ((Object[]) value)) {
|
||||
// Check each element to ensure its type is the same or can be coerced (if need be)
|
||||
if (!isCompatibleDataType(o, elementDataType)) {
|
||||
if (!isCompatibleDataType(o, elementDataType, strict)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.util;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Wraps an object to be able to check equality conveniently, even if it doesn't have meaningful equals() implementation
|
||||
* or it's not appropriate for a given context.
|
||||
* This is achieved by providing a list of transformer functions that usually extract properties of the object (and maybe transform them)
|
||||
* and do equality checks on those.
|
||||
* <br/><br/>
|
||||
* Also provides a convenient toString() which can help identifying differences of expected and actual objects during unit tests.
|
||||
* <br/><br/>
|
||||
* Here's an example of a typical use-case:
|
||||
*
|
||||
* <pre>
|
||||
* {@code
|
||||
* @Test
|
||||
* public void testPersonEquals() throws Exception {
|
||||
* // GIVEN
|
||||
* Person expected = new Person();
|
||||
* expected.setName("Joe");
|
||||
* expected.setStuff(Arrays.asList(1, 2, 3));
|
||||
*
|
||||
* Person actual = new Person();
|
||||
* actual.setName("Joe");
|
||||
* actual.setStuff(Arrays.asList(1, 2, 3));
|
||||
*
|
||||
* // WHEN
|
||||
* List<Function<Person, Object>> equalsProperties = Arrays.asList(
|
||||
* Person::getName,
|
||||
* Person::getStuff
|
||||
* );
|
||||
*
|
||||
* EqualsWrapper expectedWrapper = new EqualsWrapper(expected, equalsProperties);
|
||||
* EqualsWrapper actualWrapper = new EqualsWrapper(actual, equalsProperties);
|
||||
*
|
||||
* // THEN
|
||||
* assertEquals(expectedWrapper, actualWrapper);
|
||||
* }
|
||||
*
|
||||
* private class Person {
|
||||
* private String name;
|
||||
* private List<Object> stuff = new ArrayList<>();
|
||||
*
|
||||
* public String getName() {
|
||||
* return name;
|
||||
* }
|
||||
* public void setName(String name) {
|
||||
* this.name = name;
|
||||
* }
|
||||
*
|
||||
* public List<Object> getStuff() {
|
||||
* return stuff;
|
||||
* }
|
||||
* public void setStuff(List<Object> stuff) {
|
||||
* this.stuff = stuff;
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @param <T> The type of object to wrap
|
||||
*/
|
||||
public class EqualsWrapper<T> {
|
||||
private final T item;
|
||||
private final List<Function<T, Object>> propertyProviders;
|
||||
|
||||
/**
|
||||
* Wraps an object and primes it for equality checks.
|
||||
*
|
||||
* @param item The item to be wrapped
|
||||
* @param propertyProviders List of functions with which to extract properties to use for equality checks
|
||||
*/
|
||||
public EqualsWrapper(T item, List<Function<T, Object>> propertyProviders) {
|
||||
this.item = item;
|
||||
this.propertyProviders = propertyProviders;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps multiple objects and primes them for equality checks.
|
||||
*
|
||||
* @param items The items to be wrapped
|
||||
* @param propertyProviders List of functions with with to extract properties to use for equality checks
|
||||
* @param <T> The type of the objects to be wrapped
|
||||
* @return A list of wrapped objects
|
||||
*/
|
||||
public static <T> List<EqualsWrapper<T>> wrapList(Collection<T> items, List<Function<T, Object>> propertyProviders) {
|
||||
List wrappers = items.stream().map(item -> new EqualsWrapper(item, propertyProviders)).collect(Collectors.toList());
|
||||
|
||||
return wrappers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null
|
||||
|| getClass() != o.getClass()
|
||||
|| !Arrays.equals(o.getClass().getGenericInterfaces(), o.getClass().getGenericInterfaces())
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final EqualsWrapper<T> that = (EqualsWrapper<T>) o;
|
||||
final EqualsBuilder equalsBuilder = new EqualsBuilder();
|
||||
|
||||
for (Function<T, Object> propertyProvider : propertyProviders) {
|
||||
equalsBuilder.append(propertyProvider.apply(item), propertyProvider.apply(that.item));
|
||||
}
|
||||
|
||||
return equalsBuilder.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final HashCodeBuilder hashCodeBuilder = new HashCodeBuilder(17, 37);
|
||||
|
||||
for (Function<T, Object> propertyProvider : propertyProviders) {
|
||||
hashCodeBuilder.append(propertyProvider.apply(item));
|
||||
}
|
||||
|
||||
return hashCodeBuilder.toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringJoiner stringJoiner = new StringJoiner(",\n\t", "{\n\t", "\n}");
|
||||
|
||||
for (Function<T, Object> propertySupplier : propertyProviders) {
|
||||
stringJoiner.add(Optional.ofNullable(propertySupplier.apply(item)).orElse("N/A").toString());
|
||||
}
|
||||
|
||||
return stringJoiner.toString();
|
||||
}
|
||||
}
|
|
@ -232,11 +232,15 @@ public class AvroTypeUtil {
|
|||
final List<Schema> unionTypes = new ArrayList<>(options.size());
|
||||
final Set<Type> typesAdded = new HashSet<>();
|
||||
|
||||
int optionCounter = 1;
|
||||
for (final DataType option : options) {
|
||||
final Schema optionSchema = buildAvroSchema(option, fieldName, fieldNamePrefix, false);
|
||||
if (!typesAdded.contains(optionSchema.getType())) {
|
||||
unionTypes.add(optionSchema);
|
||||
typesAdded.add(optionSchema.getType());
|
||||
} else if (Type.RECORD.equals(optionSchema.getType()) && !unionTypes.contains(optionSchema)) {
|
||||
final Schema indexedOptionSchema = buildAvroSchema(option, fieldName + ++optionCounter, fieldNamePrefix, false);
|
||||
unionTypes.add(indexedOptionSchema);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.avro;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.avro.Conversions;
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
|
@ -857,6 +858,108 @@ public class TestAvroTypeUtil {
|
|||
testSchemaWithReoccurringFieldName(reoccurringFieldName, childRecord11Fields, childRecord21Fields, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaWithArrayOfRecordsThatContainDifferentChildRecordForSameField() throws Exception {
|
||||
// GIVEN
|
||||
SimpleRecordSchema recordSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema recordSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
|
||||
RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(recordSchema1),
|
||||
RecordFieldType.RECORD.getRecordDataType(recordSchema2)
|
||||
))
|
||||
));
|
||||
|
||||
RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
|
||||
)
|
||||
)));
|
||||
|
||||
String expected = "{\n" +
|
||||
" \"type\": \"record\",\n" +
|
||||
" \"name\": \"nifiRecord\",\n" +
|
||||
" \"namespace\": \"org.apache.nifi\",\n" +
|
||||
" \"fields\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"dataCollection\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" \"null\",\n" +
|
||||
" {\n" +
|
||||
" \"type\": \"array\",\n" +
|
||||
" \"items\": {\n" +
|
||||
" \"type\": \"record\",\n" +
|
||||
" \"name\": \"dataCollectionType\",\n" +
|
||||
" \"fields\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"record\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" {\n" +
|
||||
" \"type\": \"record\",\n" +
|
||||
" \"name\": \"dataCollection_recordType\",\n" +
|
||||
" \"fields\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"integer\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" \"null\",\n" +
|
||||
" \"int\"\n" +
|
||||
" ]\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"boolean\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" \"null\",\n" +
|
||||
" \"boolean\"\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"type\": \"record\",\n" +
|
||||
" \"name\": \"dataCollection_record2Type\",\n" +
|
||||
" \"fields\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"integer\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" \"null\",\n" +
|
||||
" \"int\"\n" +
|
||||
" ]\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"string\",\n" +
|
||||
" \"type\": [\n" +
|
||||
" \"null\",\n" +
|
||||
" \"string\"\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" },\n" +
|
||||
" \"null\"\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}";
|
||||
|
||||
// WHEN
|
||||
Schema actual = AvroTypeUtil.extractAvroSchema(schema);
|
||||
|
||||
// THEN
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
assertEquals(mapper.readTree(expected), mapper.readTree(actual.toString()));
|
||||
}
|
||||
|
||||
private void testSchemaWithReoccurringFieldName(String reoccurringFieldName, List<RecordField> childRecord11Fields, List<RecordField> childRecord21Fields, String expected) {
|
||||
// GIVEN
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
|
|
|
@ -157,6 +157,11 @@
|
|||
<exclude>src/test/resources/json/bank-account-multiarray.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
|
||||
<exclude>src/test/resources/json/similar-records.json</exclude>
|
||||
<exclude>src/test/resources/json/choice-of-embedded-similar-records.json</exclude>
|
||||
<exclude>src/test/resources/json/choice-of-embedded-arrays-and-single-records.json</exclude>
|
||||
<exclude>src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json</exclude>
|
||||
<exclude>src/test/resources/json/choice-of-different-arrays-with-extra-fields.json</exclude>
|
||||
<exclude>src/test/resources/json/data-types.json</exclude>
|
||||
<exclude>src/test/resources/json/timestamp.json</exclude>
|
||||
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.text.DateFormat;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -176,6 +177,30 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
|
||||
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
|
||||
elementDataType = arrayDataType.getElementType();
|
||||
} else if (dataType != null && dataType.getFieldType() == RecordFieldType.CHOICE) {
|
||||
List<DataType> possibleSubTypes = ((ChoiceDataType)dataType).getPossibleSubTypes();
|
||||
|
||||
for (DataType possibleSubType : possibleSubTypes) {
|
||||
if (possibleSubType.getFieldType() == RecordFieldType.ARRAY) {
|
||||
ArrayDataType possibleArrayDataType = (ArrayDataType)possibleSubType;
|
||||
DataType possibleElementType = possibleArrayDataType.getElementType();
|
||||
|
||||
final Object[] possibleArrayElements = new Object[numElements];
|
||||
int elementCounter = 0;
|
||||
for (final JsonNode node : arrayNode) {
|
||||
final Object value = getRawNodeValue(node, possibleElementType, fieldName);
|
||||
possibleArrayElements[elementCounter++] = value;
|
||||
}
|
||||
|
||||
if (DataTypeUtils.isArrayTypeCompatible(possibleArrayElements, possibleElementType, true)) {
|
||||
return possibleArrayElements;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Couldn't find proper schema for '{}'. This could lead to some fields filtered out.", fieldName);
|
||||
|
||||
elementDataType = dataType;
|
||||
} else {
|
||||
elementDataType = dataType;
|
||||
}
|
||||
|
@ -231,8 +256,17 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
} else if (dataType != null && RecordFieldType.CHOICE == dataType.getFieldType()) {
|
||||
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
|
||||
|
||||
for (final DataType possibleDataType : choiceDataType.getPossibleSubTypes()) {
|
||||
final Record record = createOptionalRecord(fieldNode, possibleDataType);
|
||||
List<DataType> possibleSubTypes = choiceDataType.getPossibleSubTypes();
|
||||
|
||||
for (final DataType possibleDataType : possibleSubTypes) {
|
||||
final Record record = createOptionalRecord(fieldNode, possibleDataType, true);
|
||||
if (record != null) {
|
||||
return record;
|
||||
}
|
||||
}
|
||||
|
||||
for (final DataType possibleDataType : possibleSubTypes) {
|
||||
final Record record = createOptionalRecord(fieldNode, possibleDataType, false);
|
||||
if (record != null) {
|
||||
return record;
|
||||
}
|
||||
|
@ -246,18 +280,18 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
return createRecordFromRawValue(fieldNode, childSchema);
|
||||
}
|
||||
|
||||
private Record createOptionalRecord(final JsonNode fieldNode, final DataType dataType) throws IOException {
|
||||
private Record createOptionalRecord(final JsonNode fieldNode, final DataType dataType, final boolean strict) throws IOException {
|
||||
if (dataType.getFieldType() == RecordFieldType.RECORD) {
|
||||
final RecordSchema possibleSchema = ((RecordDataType) dataType).getChildSchema();
|
||||
final Record possibleRecord = createRecordFromRawValue(fieldNode, possibleSchema);
|
||||
|
||||
if (DataTypeUtils.isCompatibleDataType(possibleRecord, dataType)) {
|
||||
if (DataTypeUtils.isCompatibleDataType(possibleRecord, dataType, strict)) {
|
||||
return possibleRecord;
|
||||
}
|
||||
} else if (dataType.getFieldType() == RecordFieldType.ARRAY) {
|
||||
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
|
||||
final DataType elementType = arrayDataType.getElementType();
|
||||
final Record record = createOptionalRecord(fieldNode, elementType);
|
||||
final Record record = createOptionalRecord(fieldNode, elementType, strict);
|
||||
return record;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,21 +18,25 @@
|
|||
package org.apache.nifi.json;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
|
||||
import org.apache.nifi.schema.inference.TimeValueInference;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.util.EqualsWrapper;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
|
@ -48,12 +52,14 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestJsonTreeRowRecordReader {
|
||||
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
|
||||
|
@ -127,7 +133,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
|
||||
final byte[] data = Files.readAllBytes(file.toPath());
|
||||
|
||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
final ComponentLog logger = mock(ComponentLog.class);
|
||||
|
||||
int recordCount = 0;
|
||||
final int iterations = 1000;
|
||||
|
@ -156,7 +162,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
|
||||
final byte[] data = Files.readAllBytes(file.toPath());
|
||||
|
||||
final ComponentLog logger = Mockito.mock(ComponentLog.class);
|
||||
final ComponentLog logger = mock(ComponentLog.class);
|
||||
|
||||
int recordCount = 0;
|
||||
final int iterations = 1_000_000;
|
||||
|
@ -183,7 +189,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
// evaluate first record
|
||||
final Record firstRecord = reader.nextRecord();
|
||||
|
@ -238,7 +244,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -264,7 +270,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -291,7 +297,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -317,7 +323,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiarray.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -349,7 +355,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-mixed.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -385,7 +391,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final Record schemaValidatedRecord = reader.nextRecord(true, true);
|
||||
assertEquals(1, schemaValidatedRecord.getValue("id"));
|
||||
|
@ -394,7 +400,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
}
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final Record rawRecord = reader.nextRecord(false, false);
|
||||
assertEquals(1, rawRecord.getValue("id"));
|
||||
|
@ -417,7 +423,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final Record schemaValidatedRecord = reader.nextRecord(true, true);
|
||||
assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
|
||||
|
@ -428,7 +434,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
}
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final Record rawRecord = reader.nextRecord(false, false);
|
||||
assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
|
||||
|
@ -452,7 +458,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
|
||||
for (final boolean coerceTypes : new boolean[] {true, false}) {
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
|
||||
|
||||
final Record record = reader.nextRecord(coerceTypes, false);
|
||||
final Object value = record.getValue("timestamp");
|
||||
|
@ -466,7 +472,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -492,7 +498,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(choiceFields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -523,7 +529,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
|
||||
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
|
||||
|
@ -555,7 +561,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {
|
||||
|
@ -583,7 +589,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -617,7 +623,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
|
||||
|
@ -646,7 +652,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
|
||||
|
@ -688,7 +694,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
final Object[] firstRecordValues = reader.nextRecord().getValues();
|
||||
|
||||
|
@ -713,7 +719,7 @@ public class TestJsonTreeRowRecordReader {
|
|||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account-wrong-field-type.json"));
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
|
||||
reader.nextRecord().getValues();
|
||||
Assert.fail("Was able to read record with invalid schema.");
|
||||
|
@ -726,4 +732,383 @@ public class TestJsonTreeRowRecordReader {
|
|||
assertTrue(msg.contains("Boolean"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeOfSimilarRecords() throws Exception {
|
||||
// GIVEN
|
||||
String jsonPath = "src/test/resources/json/similar-records.json";
|
||||
|
||||
RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
|
||||
new RecordField("booleanOrString", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.BOOLEAN.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
)),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
|
||||
List<Object> expected = Arrays.asList(
|
||||
new MapRecord(expectedSchema, new HashMap<String, Object>(){{
|
||||
put("integer", 1);
|
||||
put("boolean", true);
|
||||
put("booleanOrString", true);
|
||||
}}),
|
||||
new MapRecord(expectedSchema, new HashMap<String, Object>(){{
|
||||
put("integer", 2);
|
||||
put("string", "stringValue2");
|
||||
put("booleanOrString", "booleanOrStringValue2");
|
||||
}})
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonPath, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChoiceOfEmbeddedSimilarRecords() throws Exception {
|
||||
// GIVEN
|
||||
String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json";
|
||||
|
||||
SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
|
||||
))
|
||||
));
|
||||
|
||||
List<Object> expected = Arrays.asList(
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
|
||||
put("integer", 1);
|
||||
put("boolean", true);
|
||||
}}));
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema2, new HashMap<String, Object>(){{
|
||||
put("integer", 2);
|
||||
put("string", "stringValue2");
|
||||
}}));
|
||||
}})
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonPath, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
|
||||
// GIVEN
|
||||
String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
|
||||
|
||||
SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
|
||||
))
|
||||
));
|
||||
|
||||
List<Object> expected = Arrays.asList(
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
|
||||
put("integer", 1);
|
||||
}}));
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
|
||||
put("integer", 21);
|
||||
put("boolean", true);
|
||||
}}),
|
||||
new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
|
||||
put("integer", 22);
|
||||
put("boolean", false);
|
||||
}})
|
||||
});
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{
|
||||
put("integer", 3);
|
||||
put("string", "stringValue3");
|
||||
}}));
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
|
||||
put("integer", 41);
|
||||
put("string", "stringValue41");
|
||||
}}),
|
||||
new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
|
||||
put("integer", 42);
|
||||
put("string", "stringValue42");
|
||||
}})
|
||||
});
|
||||
}})
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonPath, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
|
||||
// GIVEN
|
||||
String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
|
||||
|
||||
SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
|
||||
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
|
||||
))
|
||||
));
|
||||
|
||||
List<Object> expected = Arrays.asList(
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
|
||||
put("integer", 1);
|
||||
put("boolean", false);
|
||||
}}));
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
|
||||
put("integer", 21);
|
||||
put("boolean", true);
|
||||
}}),
|
||||
new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
|
||||
put("integer", 22);
|
||||
put("boolean", false);
|
||||
}})
|
||||
});
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{
|
||||
put("integer", 3);
|
||||
put("string", "stringValue3");
|
||||
}}));
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
|
||||
put("integer", 41);
|
||||
put("string", "stringValue41");
|
||||
}}),
|
||||
new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
|
||||
put("integer", 42);
|
||||
put("string", "stringValue42");
|
||||
}}),
|
||||
new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
|
||||
put("integer", 43);
|
||||
put("boolean", false);
|
||||
}})
|
||||
});
|
||||
}})
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonPath, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
|
||||
// GIVEN
|
||||
String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
|
||||
|
||||
SimpleRecordSchema recordSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema recordSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
|
||||
RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
|
||||
))
|
||||
));
|
||||
|
||||
RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
|
||||
RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
|
||||
)
|
||||
)));
|
||||
|
||||
SimpleRecordSchema expectedChildSchema1 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
|
||||
));
|
||||
SimpleRecordSchema expectedChildSchema2 = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("integer", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("string", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
|
||||
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
|
||||
))
|
||||
));
|
||||
|
||||
// Since the actual arrays have records with either (INT, BOOLEAN, STRING) or (INT, STRING, STRING)
|
||||
// while the explicit schema defines only (INT, BOOLEAN) and (INT, STRING) we can't tell which record schema to chose
|
||||
// so we take the first one (INT, BOOLEAN) - as best effort - for both cases
|
||||
SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays = expectedChildSchema1;
|
||||
|
||||
List<Object> expected = Arrays.asList(
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
|
||||
put("integer", 11);
|
||||
put("boolean", true);
|
||||
put("extraString", "extraStringValue11");
|
||||
}}),
|
||||
new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
|
||||
put("integer", 12);
|
||||
put("boolean", false);
|
||||
put("extraString", "extraStringValue12");
|
||||
}})
|
||||
});
|
||||
}}),
|
||||
new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
|
||||
put("record", new Object[]{
|
||||
new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
|
||||
put("integer", 21);
|
||||
put("extraString", "extraStringValue21");
|
||||
put("string", "stringValue21");
|
||||
}}),
|
||||
new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
|
||||
put("integer", 22);
|
||||
put("extraString", "extraStringValue22");
|
||||
put("string", "stringValue22");
|
||||
}})
|
||||
});
|
||||
}})
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonPath, schema, expected);
|
||||
}
|
||||
|
||||
private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
|
||||
// GIVEN
|
||||
final File jsonFile = new File(jsonPath);
|
||||
|
||||
try (
|
||||
InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
|
||||
) {
|
||||
RecordSchema schema = inferSchema(jsonStream);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonStream, schema, expected);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
|
||||
// GIVEN
|
||||
final File jsonFile = new File(jsonPath);
|
||||
|
||||
try (
|
||||
InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
|
||||
) {
|
||||
// WHEN
|
||||
// THEN
|
||||
testReadRecords(jsonStream, schema, expected);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
|
||||
// GIVEN
|
||||
try (
|
||||
JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat);
|
||||
) {
|
||||
// WHEN
|
||||
List<Object> actual = new ArrayList<>();
|
||||
Record record;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
List<Object> dataCollection = Arrays.asList((Object[]) record.getValue("dataCollection"));
|
||||
actual.addAll(dataCollection);
|
||||
}
|
||||
|
||||
// THEN
|
||||
List<Function<Object, Object>> propertyProviders = Arrays.asList(
|
||||
_object -> ((Record)_object).getSchema(),
|
||||
_object -> Arrays.stream(((Record)_object).getValues()).map(value -> {
|
||||
if (value != null && value.getClass().isArray()) {
|
||||
return Arrays.asList((Object[]) value);
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}).collect(Collectors.toList())
|
||||
);
|
||||
|
||||
List<EqualsWrapper<Object>> wrappedExpected = EqualsWrapper.wrapList(expected, propertyProviders);
|
||||
List<EqualsWrapper<Object>> wrappedActual = EqualsWrapper.wrapList(actual, propertyProviders);
|
||||
|
||||
assertEquals(wrappedExpected, wrappedActual);
|
||||
}
|
||||
}
|
||||
|
||||
private RecordSchema inferSchema(InputStream jsonStream) throws IOException {
|
||||
RecordSchema schema = new InferSchemaAccessStrategy<>(
|
||||
(__, inputStream) -> new JsonRecordSource(inputStream),
|
||||
new JsonSchemaInference(new TimeValueInference(null, null, null)),
|
||||
mock(ComponentLog.class)
|
||||
).getSchema(Collections.emptyMap(), jsonStream, null);
|
||||
|
||||
jsonStream.reset();
|
||||
|
||||
return schema;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,6 +144,35 @@ public class TestFieldTypeInference {
|
|||
runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWithMultipleComplexRecord() {
|
||||
// GIVEN
|
||||
String fieldName1 = "fieldName1";
|
||||
String fieldName2 = "fieldName2";
|
||||
String fieldName3 = "fieldName3";
|
||||
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField(fieldName1, RecordFieldType.INT.getDataType()),
|
||||
new RecordField(fieldName2, RecordFieldType.STRING.getDataType())
|
||||
))),
|
||||
RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField(fieldName1, RecordFieldType.INT.getDataType()),
|
||||
new RecordField(fieldName3, RecordFieldType.BOOLEAN.getDataType())
|
||||
)))
|
||||
);
|
||||
|
||||
DataType expected = RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField(fieldName1, RecordFieldType.INT.getDataType()),
|
||||
new RecordField(fieldName2, RecordFieldType.STRING.getDataType()),
|
||||
new RecordField(fieldName3, RecordFieldType.BOOLEAN.getDataType())
|
||||
)));
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWhenDecimal() {
|
||||
// GIVEN
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
{
|
||||
"dataCollection":[
|
||||
{
|
||||
"record": [{
|
||||
"integer": 11,
|
||||
"boolean": true,
|
||||
"extraString" : "extraStringValue11"
|
||||
},
|
||||
{
|
||||
"integer": 12,
|
||||
"boolean": false,
|
||||
"extraString" : "extraStringValue12"
|
||||
}]
|
||||
},
|
||||
{
|
||||
"record": [{
|
||||
"integer": 21,
|
||||
"string": "stringValue21",
|
||||
"extraString" : "extraStringValue21"
|
||||
},
|
||||
{
|
||||
"integer": 22,
|
||||
"string": "stringValue22",
|
||||
"extraString" : "extraStringValue22"
|
||||
}]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
{
|
||||
"dataCollection":[
|
||||
{
|
||||
"record": {
|
||||
"integer": 1
|
||||
}
|
||||
},
|
||||
{
|
||||
"record": [{
|
||||
"integer": 21,
|
||||
"boolean": true
|
||||
},
|
||||
{
|
||||
"integer": 22,
|
||||
"boolean": false
|
||||
}]
|
||||
},
|
||||
{
|
||||
"record": {
|
||||
"integer": 3,
|
||||
"string": "stringValue3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"record": [{
|
||||
"integer": 41,
|
||||
"string": "stringValue41"
|
||||
},
|
||||
{
|
||||
"integer": 42,
|
||||
"string": "stringValue42"
|
||||
}]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"dataCollection":[
|
||||
{
|
||||
"record": {
|
||||
"integer": 1,
|
||||
"boolean": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"record": {
|
||||
"integer": 2,
|
||||
"string": "stringValue2"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
{
|
||||
"dataCollection":[
|
||||
{
|
||||
"record": {
|
||||
"integer": 1,
|
||||
"boolean": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"record": [{
|
||||
"integer": 21,
|
||||
"boolean": true
|
||||
},
|
||||
{
|
||||
"integer": 22,
|
||||
"boolean": false
|
||||
}]
|
||||
},
|
||||
{
|
||||
"record": {
|
||||
"integer": 3,
|
||||
"string": "stringValue3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"record": [{
|
||||
"integer": 41,
|
||||
"string": "stringValue41"
|
||||
},
|
||||
{
|
||||
"integer": 42,
|
||||
"string": "stringValue42"
|
||||
},
|
||||
{
|
||||
"integer": 43,
|
||||
"boolean": false
|
||||
}]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"dataCollection":[
|
||||
{
|
||||
"integer": 1,
|
||||
"boolean": true,
|
||||
"booleanOrString": true
|
||||
},
|
||||
{
|
||||
"integer": 2,
|
||||
"string": "stringValue2",
|
||||
"booleanOrString": "booleanOrStringValue2"
|
||||
}
|
||||
]
|
||||
}
|
Loading…
Reference in New Issue