mirror of https://github.com/apache/nifi.git
NIFI-6640 - UNION/CHOICE types not handled correctly
3 important changes: 1. FieldTypeInference had a bug when dealing with multiple datatypes for the same field where some (but not all) were in a wider-than-the-other relationship. Before: Some datatypes could be lost. String was wider than any other. After: Consistent behaviour. String is NOT wider than any other. 2. Choosing a datatype for a value from a ChoiceDataType: Before it chose the first compatible datatype as the basis of conversion. After change it tries to find the most suitable datatype. 3. Conversion of a value of avro union type: Before it chose the first compatible datatype as the basis of conversion. After change it tries to find the most suitable datatype. Change: In the RecordFieldType enum moved TIMESTAMP ahead of DATE. This closes #3724. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
ff6a7d9561
commit
34112519c2
|
@ -72,6 +72,11 @@ public enum RecordFieldType {
|
|||
*/
|
||||
DOUBLE("double", FLOAT),
|
||||
|
||||
/**
|
||||
* A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
|
||||
*/
|
||||
TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
|
||||
|
||||
/**
|
||||
* A date field type. Fields of this type use a {@code java.sql.Date} value.
|
||||
*/
|
||||
|
@ -82,11 +87,6 @@ public enum RecordFieldType {
|
|||
*/
|
||||
TIME("time", "HH:mm:ss"),
|
||||
|
||||
/**
|
||||
* A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
|
||||
*/
|
||||
TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
|
||||
|
||||
/**
|
||||
* A char field type. Fields of this type use a {@code char} value.
|
||||
*/
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.InputStream;
|
||||
import java.io.Reader;
|
||||
import java.lang.reflect.Array;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -47,16 +48,21 @@ import java.text.ParseException;
|
|||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -225,17 +231,75 @@ public class DataTypeUtils {
|
|||
}
|
||||
|
||||
public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
|
||||
for (final DataType subType : choiceType.getPossibleSubTypes()) {
|
||||
if (isCompatibleDataType(value, subType)) {
|
||||
if (subType.getFieldType() == RecordFieldType.CHOICE) {
|
||||
return chooseDataType(value, (ChoiceDataType) subType);
|
||||
}
|
||||
Queue<DataType> possibleSubTypes = new LinkedList<>(choiceType.getPossibleSubTypes());
|
||||
List<DataType> compatibleSimpleSubTypes = new ArrayList<>();
|
||||
|
||||
return subType;
|
||||
DataType subType;
|
||||
while ((subType = possibleSubTypes.poll()) != null) {
|
||||
if (subType instanceof ChoiceDataType) {
|
||||
possibleSubTypes.addAll(((ChoiceDataType) subType).getPossibleSubTypes());
|
||||
} else {
|
||||
if (isCompatibleDataType(value, subType)) {
|
||||
compatibleSimpleSubTypes.add(subType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
int nrOfCompatibleSimpleSubTypes = compatibleSimpleSubTypes.size();
|
||||
|
||||
final DataType chosenSimpleType;
|
||||
if (nrOfCompatibleSimpleSubTypes == 0) {
|
||||
chosenSimpleType = null;
|
||||
} else if (nrOfCompatibleSimpleSubTypes == 1) {
|
||||
chosenSimpleType = compatibleSimpleSubTypes.get(0);
|
||||
} else {
|
||||
chosenSimpleType = findMostSuitableType(value, compatibleSimpleSubTypes, Function.identity())
|
||||
.orElse(compatibleSimpleSubTypes.get(0));
|
||||
}
|
||||
|
||||
return chosenSimpleType;
|
||||
}
|
||||
|
||||
public static <T> Optional<T> findMostSuitableType(Object value, List<T> types, Function<T, DataType> dataTypeMapper) {
|
||||
if (value instanceof String) {
|
||||
return findMostSuitableTypeByStringValue((String) value, types, dataTypeMapper);
|
||||
} else {
|
||||
DataType inferredDataType = inferDataType(value, null);
|
||||
|
||||
if (inferredDataType != null && !inferredDataType.getFieldType().equals(RecordFieldType.STRING)) {
|
||||
for (T type : types) {
|
||||
if (inferredDataType.equals(dataTypeMapper.apply(type))) {
|
||||
return Optional.of(type);
|
||||
}
|
||||
}
|
||||
|
||||
for (T type : types) {
|
||||
if (getWiderType(dataTypeMapper.apply(type), inferredDataType).isPresent()) {
|
||||
return Optional.of(type);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static <T> Optional<T> findMostSuitableTypeByStringValue(String valueAsString, List<T> types, Function<T, DataType> dataTypeMapper) {
|
||||
// Sorting based on the RecordFieldType enum ordering looks appropriate here as we want simpler types
|
||||
// first and the enum's ordering seems to reflect that
|
||||
Collections.sort(types, Comparator.comparing(type -> dataTypeMapper.apply(type).getFieldType()));
|
||||
|
||||
for (T type : types) {
|
||||
try {
|
||||
if (isCompatibleDataType(valueAsString, dataTypeMapper.apply(type))) {
|
||||
return Optional.of(type);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception thrown while checking if '" + valueAsString + "' is compatible with '" + type + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) {
|
||||
|
@ -440,12 +504,12 @@ public class DataTypeUtils {
|
|||
// final DataType elementDataType = inferDataType(valueFromMap, RecordFieldType.STRING.getDataType());
|
||||
// return RecordFieldType.MAP.getMapDataType(elementDataType);
|
||||
}
|
||||
if (value instanceof Object[]) {
|
||||
final Object[] array = (Object[]) value;
|
||||
|
||||
if (value.getClass().isArray()) {
|
||||
DataType mergedDataType = null;
|
||||
for (final Object arrayValue : array) {
|
||||
final DataType inferredDataType = inferDataType(arrayValue, RecordFieldType.STRING.getDataType());
|
||||
|
||||
int length = Array.getLength(value);
|
||||
for(int index = 0; index < length; index++) {
|
||||
final DataType inferredDataType = inferDataType(Array.get(value, index), RecordFieldType.STRING.getDataType());
|
||||
mergedDataType = mergeDataTypes(mergedDataType, inferredDataType);
|
||||
}
|
||||
|
||||
|
@ -1545,7 +1609,10 @@ public class DataTypeUtils {
|
|||
possibleTypes.add(otherDataType);
|
||||
}
|
||||
|
||||
return RecordFieldType.CHOICE.getChoiceDataType(new ArrayList<>(possibleTypes));
|
||||
ArrayList<DataType> possibleChildTypes = new ArrayList<>(possibleTypes);
|
||||
Collections.sort(possibleChildTypes, Comparator.comparing(DataType::getFieldType));
|
||||
|
||||
return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.nifi.serialization.record;
|
||||
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
|
||||
import org.junit.Test;
|
||||
|
@ -27,10 +28,17 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -324,4 +332,220 @@ public class TestDataTypeUtils {
|
|||
}
|
||||
assertNotNull(e);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeByStringValueShouldReturnEvenWhenOneTypeThrowsException() {
|
||||
String valueAsString = "value";
|
||||
|
||||
String nonMatchingType = "nonMatchingType";
|
||||
String throwsExceptionType = "throwsExceptionType";
|
||||
String matchingType = "matchingType";
|
||||
|
||||
List<String> types = Arrays.asList(
|
||||
nonMatchingType,
|
||||
throwsExceptionType,
|
||||
matchingType
|
||||
);
|
||||
Optional<String> expected = Optional.of(matchingType);
|
||||
|
||||
AtomicBoolean exceptionThrown = new AtomicBoolean(false);
|
||||
|
||||
Function<String, DataType> dataTypeMapper = type -> {
|
||||
if (type.equals(nonMatchingType)) {
|
||||
return RecordFieldType.BOOLEAN.getDataType();
|
||||
} else if (type.equals(throwsExceptionType)) {
|
||||
return new DataType(RecordFieldType.DATE, null) {
|
||||
@Override
|
||||
public String getFormat() {
|
||||
exceptionThrown.set(true);
|
||||
throw new RuntimeException("maching error");
|
||||
}
|
||||
};
|
||||
} else if (type.equals(matchingType)) {
|
||||
return RecordFieldType.STRING.getDataType();
|
||||
}
|
||||
|
||||
return null;
|
||||
};
|
||||
|
||||
Optional<String> actual = DataTypeUtils.findMostSuitableTypeByStringValue(valueAsString, types, dataTypeMapper);
|
||||
assertTrue("Exception not thrown during test as intended.", exceptionThrown.get());
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseDataTypeWhenInt_vs_INT_FLOAT_ThenShouldReturnINT() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType()
|
||||
);
|
||||
|
||||
Object value = 1;
|
||||
DataType expected = RecordFieldType.INT.getDataType();
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseDataTypeWhenFloat_vs_INT_FLOAT_ThenShouldReturnFLOAT() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType()
|
||||
);
|
||||
|
||||
Object value = 1.5f;
|
||||
DataType expected = RecordFieldType.FLOAT.getDataType();
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseDataTypeWhenHasChoiceThenShouldReturnSingleMatchingFromChoice() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.DOUBLE.getDataType(),
|
||||
RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.FLOAT.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
)
|
||||
);
|
||||
|
||||
Object value = 1.5f;
|
||||
DataType expected = RecordFieldType.FLOAT.getDataType();
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
|
||||
}
|
||||
|
||||
private <E> void testChooseDataTypeAlsoReverseTypes(Object value, List<DataType> dataTypes, DataType expected) {
|
||||
testChooseDataType(dataTypes, value, expected);
|
||||
Collections.reverse(dataTypes);
|
||||
testChooseDataType(dataTypes, value, expected);
|
||||
}
|
||||
|
||||
private void testChooseDataType(List<DataType> dataTypes, Object value, DataType expected) {
|
||||
// GIVEN
|
||||
ChoiceDataType choiceDataType = (ChoiceDataType) RecordFieldType.CHOICE.getChoiceDataType(dataTypes.toArray(new DataType[dataTypes.size()]));
|
||||
|
||||
// WHEN
|
||||
DataType actual = DataTypeUtils.chooseDataType(value, choiceDataType);
|
||||
|
||||
// THEN
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithBoolean() {
|
||||
testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithByte() {
|
||||
testFindMostSuitableType(Byte.valueOf((byte) 123), RecordFieldType.BYTE.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithShort() {
|
||||
testFindMostSuitableType(Short.valueOf((short) 123), RecordFieldType.SHORT.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithInt() {
|
||||
testFindMostSuitableType(123, RecordFieldType.INT.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithLong() {
|
||||
testFindMostSuitableType(123L, RecordFieldType.LONG.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithBigInt() {
|
||||
testFindMostSuitableType(BigInteger.valueOf(123L), RecordFieldType.BIGINT.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithFloat() {
|
||||
testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithDouble() {
|
||||
testFindMostSuitableType(12.3, RecordFieldType.DOUBLE.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithDate() {
|
||||
testFindMostSuitableType("1111-11-11", RecordFieldType.DATE.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithTime() {
|
||||
testFindMostSuitableType("11:22:33", RecordFieldType.TIME.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithTimeStamp() {
|
||||
testFindMostSuitableType("1111-11-11 11:22:33", RecordFieldType.TIMESTAMP.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithChar() {
|
||||
testFindMostSuitableType('a', RecordFieldType.CHAR.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithStringShouldReturnChar() {
|
||||
testFindMostSuitableType("abc", RecordFieldType.CHAR.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithString() {
|
||||
testFindMostSuitableType("abc", RecordFieldType.STRING.getDataType(), RecordFieldType.CHAR.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindMostSuitableTypeWithArray() {
|
||||
testFindMostSuitableType(new int[]{1, 2, 3}, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()));
|
||||
}
|
||||
|
||||
private void testFindMostSuitableType(Object value, DataType expected, DataType... filtered) {
|
||||
List<DataType> filteredOutDataTypes = Arrays.stream(filtered).collect(Collectors.toList());
|
||||
|
||||
// GIVEN
|
||||
List<DataType> unexpectedTypes = Arrays.stream(RecordFieldType.values())
|
||||
.flatMap(recordFieldType -> {
|
||||
Stream<DataType> dataTypeStream;
|
||||
|
||||
if (RecordFieldType.ARRAY.equals(recordFieldType)) {
|
||||
dataTypeStream = Arrays.stream(RecordFieldType.values()).map(elementType -> RecordFieldType.ARRAY.getArrayDataType(elementType.getDataType()));
|
||||
} else {
|
||||
dataTypeStream = Stream.of(recordFieldType.getDataType());
|
||||
}
|
||||
|
||||
return dataTypeStream;
|
||||
})
|
||||
.filter(dataType -> !dataType.equals(expected))
|
||||
.filter(dataType -> !filteredOutDataTypes.contains(dataType))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
IntStream.rangeClosed(0, unexpectedTypes.size()).forEach(insertIndex -> {
|
||||
List<DataType> allTypes = new LinkedList<>(unexpectedTypes);
|
||||
allTypes.add(insertIndex, expected);
|
||||
|
||||
// WHEN
|
||||
Optional<DataType> actual = DataTypeUtils.findMostSuitableType(value, allTypes, Function.identity());
|
||||
|
||||
// THEN
|
||||
assertEquals(Optional.ofNullable(expected), actual);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AvroTypeUtil {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
|
||||
|
@ -877,6 +878,16 @@ public class AvroTypeUtil {
|
|||
*/
|
||||
private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function<Schema, Object> conversion, final String fieldName) {
|
||||
boolean foundNonNull = false;
|
||||
|
||||
Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
|
||||
originalValue,
|
||||
fieldSchema.getTypes().stream().filter(schema -> schema.getType() != Type.NULL).collect(Collectors.toList()),
|
||||
subSchema -> AvroTypeUtil.determineDataType(subSchema)
|
||||
);
|
||||
if (mostSuitableType.isPresent()) {
|
||||
return conversion.apply(mostSuitableType.get());
|
||||
}
|
||||
|
||||
for (final Schema subSchema : fieldSchema.getTypes()) {
|
||||
if (subSchema.getType() == Type.NULL) {
|
||||
continue;
|
||||
|
|
|
@ -55,6 +55,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -587,4 +588,56 @@ public class TestAvroTypeUtil {
|
|||
assertNotNull(((Record)inner).get("Message"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertToAvroObjectWhenIntVSUnion_INT_FLOAT_ThenReturnInt() {
|
||||
// GIVEN
|
||||
List<Schema.Type> schemaTypes = Arrays.asList(
|
||||
Schema.Type.INT,
|
||||
Schema.Type.FLOAT
|
||||
);
|
||||
Integer rawValue = 1;
|
||||
|
||||
Object expected = 1;
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, schemaTypes);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertToAvroObjectWhenFloatVSUnion_INT_FLOAT_ThenReturnFloat() {
|
||||
// GIVEN
|
||||
List<Schema.Type> schemaTypes = Arrays.asList(
|
||||
Schema.Type.INT,
|
||||
Schema.Type.FLOAT
|
||||
);
|
||||
Float rawValue = 1.5f;
|
||||
|
||||
Object expected = 1.5f;
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, schemaTypes);
|
||||
}
|
||||
|
||||
private void testConvertToAvroObjectAlsoReverseSchemaList(Object expected, Object rawValue, List<Schema.Type> schemaTypes) {
|
||||
// GIVEN
|
||||
List<Schema> schemaList = schemaTypes.stream()
|
||||
.map(Schema::create)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// WHEN
|
||||
Object actual = AvroTypeUtil.convertToAvroObject(rawValue, Schema.createUnion(schemaList), StandardCharsets.UTF_16);
|
||||
|
||||
// THEN
|
||||
assertEquals(expected, actual);
|
||||
|
||||
// WHEN
|
||||
Collections.reverse(schemaList);
|
||||
Object actualAfterReverse = AvroTypeUtil.convertToAvroObject(rawValue, Schema.createUnion(schemaList), StandardCharsets.UTF_16);
|
||||
|
||||
// THEN
|
||||
assertEquals(expected, actualAfterReverse);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -411,6 +411,14 @@
|
|||
<exclude>src/test/resources/TestMergeContent/head</exclude>
|
||||
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
|
||||
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.json</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.with_header.csv</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.without_header.csv</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.xml</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.avro</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro</exclude>
|
||||
<exclude>src/test/resources/TestConversions/data.int_float_string.without_schema.avro</exclude>
|
||||
<exclude>src/test/resources/TestConversions/explicit.schema.json</exclude>
|
||||
<exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude>
|
||||
<exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude>
|
||||
<exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude>
|
||||
|
|
|
@ -0,0 +1,388 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.avro.AvroReader;
|
||||
import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
|
||||
import org.apache.nifi.avro.AvroRecordSetWriter;
|
||||
import org.apache.nifi.csv.CSVReader;
|
||||
import org.apache.nifi.csv.CSVRecordSetWriter;
|
||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.nifi.xml.XMLReader;
|
||||
import org.apache.nifi.xml.XMLRecordSetWriter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public abstract class AbstractConversionIT {
|
||||
protected RecordReaderFactory reader;
|
||||
protected Consumer<TestRunner> inputHandler;
|
||||
protected Consumer<TestRunner> readerConfigurer;
|
||||
|
||||
protected RecordSetWriterFactory writer;
|
||||
protected Consumer<MockFlowFile> resultHandler;
|
||||
protected Consumer<TestRunner> writerConfigurer;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
reader = null;
|
||||
inputHandler = null;
|
||||
readerConfigurer = null;
|
||||
|
||||
writer = null;
|
||||
resultHandler = null;
|
||||
writerConfigurer = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvToJson() throws Exception {
|
||||
fromCsv(csvPostfix());
|
||||
toJson(jsonPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvToAvro() throws Exception {
|
||||
fromCsv(csvPostfix());
|
||||
toAvro(avroPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvToAvroToCsv() throws Exception {
|
||||
fromCsv(csvPostfix());
|
||||
|
||||
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
|
||||
AvroReader reader2 = new AvroReader();
|
||||
|
||||
toCsv(csvPostfix());
|
||||
|
||||
testChain(writer2, reader2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvToXml() throws Exception {
|
||||
fromCsv(csvPostfix());
|
||||
toXml(xmlPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonToCsv() throws Exception {
|
||||
fromJson(jsonPostfix());
|
||||
toCsv(csvPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonToAvro() throws Exception {
|
||||
fromJson(jsonPostfix());
|
||||
toAvro(avroPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonToAvroToJson() throws Exception {
|
||||
fromJson(jsonPostfix());
|
||||
|
||||
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
|
||||
AvroReader reader2 = new AvroReader();
|
||||
|
||||
toJson(jsonPostfix());
|
||||
|
||||
testChain(writer2, reader2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvroToCsv() throws Exception {
|
||||
fromAvro(avroPostfix());
|
||||
toCsv(csvPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvroToJson() throws Exception {
|
||||
fromAvro(avroPostfix());
|
||||
toJson(jsonPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvroToXml() throws Exception {
|
||||
fromAvro(avroPostfix());
|
||||
toXml(xmlPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlToCsv() throws Exception {
|
||||
fromXml(xmlPostfix());
|
||||
toCsv(csvPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlToJson() throws Exception {
|
||||
fromXml(xmlPostfix());
|
||||
toJson(jsonPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlToAvro() throws Exception {
|
||||
fromXml(xmlPostfix());
|
||||
toAvro(avroPostfix());
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXmlToAvroToXml() throws Exception {
|
||||
fromXml(xmlPostfix());
|
||||
|
||||
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
|
||||
AvroReader reader2 = new AvroReader();
|
||||
|
||||
toXml(xmlPostfix());
|
||||
|
||||
testChain(writer2, reader2);
|
||||
}
|
||||
|
||||
abstract protected String csvPostfix();
|
||||
|
||||
abstract protected String jsonPostfix();
|
||||
|
||||
abstract protected String avroPostfix();
|
||||
|
||||
abstract protected String xmlPostfix();
|
||||
|
||||
protected void commonReaderConfiguration(TestRunner testRunner) {
|
||||
}
|
||||
|
||||
protected void commonWriterConfiguration(TestRunner testRunner) {
|
||||
}
|
||||
|
||||
protected void fromCsv(String postfix) {
|
||||
reader = new CSVReader();
|
||||
inputHandler = stringInputHandler(getContent(postfix));
|
||||
|
||||
readerConfigurer = testRunner -> {
|
||||
commonReaderConfiguration(testRunner);
|
||||
};
|
||||
}
|
||||
|
||||
protected void fromJson(String postfix) {
|
||||
reader = new JsonTreeReader();
|
||||
inputHandler = stringInputHandler(getContent(postfix));
|
||||
|
||||
readerConfigurer = testRunner -> {
|
||||
commonReaderConfiguration(testRunner);
|
||||
};
|
||||
}
|
||||
|
||||
protected void fromXml(String postfix) {
|
||||
reader = new XMLReader();
|
||||
inputHandler = stringInputHandler(getContent(postfix));
|
||||
|
||||
readerConfigurer = testRunner -> {
|
||||
commonReaderConfiguration(testRunner);
|
||||
testRunner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
|
||||
};
|
||||
}
|
||||
|
||||
protected void fromAvro(String postfix) {
|
||||
reader = new AvroReader();
|
||||
inputHandler = byteInputHandler(getByteContent(postfix));
|
||||
|
||||
readerConfigurer = testRunner -> {
|
||||
commonReaderConfiguration(testRunner);
|
||||
};
|
||||
}
|
||||
|
||||
protected void toCsv(String postfix) {
|
||||
writer = new CSVRecordSetWriter();
|
||||
resultHandler = stringOutputHandler(getContent(postfix));
|
||||
|
||||
writerConfigurer = testRunner -> {
|
||||
commonWriterConfiguration(testRunner);
|
||||
};
|
||||
}
|
||||
|
||||
protected void toJson(String postfix) {
|
||||
writer = new JsonRecordSetWriter();
|
||||
resultHandler = stringOutputHandler(getContent(postfix));
|
||||
|
||||
writerConfigurer = testRunner -> {
|
||||
commonWriterConfiguration(testRunner);
|
||||
testRunner.setProperty(writer, "Pretty Print JSON", "true");
|
||||
};
|
||||
}
|
||||
|
||||
protected void toXml(String postfix) {
|
||||
writer = new XMLRecordSetWriter();
|
||||
resultHandler = stringOutputHandler(getContent(postfix));
|
||||
|
||||
writerConfigurer = testRunner -> {
|
||||
commonWriterConfiguration(testRunner);
|
||||
testRunner.setProperty(writer, "pretty_print_xml", "true");
|
||||
testRunner.setProperty(writer, "root_tag_name", "root");
|
||||
testRunner.setProperty(writer, "record_tag_name", "nifiRecord");
|
||||
};
|
||||
}
|
||||
|
||||
protected void toAvro(String postfix) {
|
||||
writer = new AvroRecordSetWriter();
|
||||
resultHandler = mockFlowFile -> {
|
||||
try {
|
||||
List<Map<String, Object>> expected = getRecords(getByteContent(postfix));
|
||||
List<Map<String, Object>> actual = getRecords(mockFlowFile.toByteArray());
|
||||
|
||||
assertEquals(expected, actual);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
|
||||
writerConfigurer = testRunner -> {
|
||||
commonWriterConfiguration(testRunner);
|
||||
};
|
||||
}
|
||||
|
||||
protected Consumer<TestRunner> stringInputHandler(String input) {
|
||||
return testRunner -> testRunner.enqueue(input);
|
||||
}
|
||||
|
||||
protected Consumer<TestRunner> byteInputHandler(byte[] input) {
|
||||
return testRunner -> testRunner.enqueue(input);
|
||||
}
|
||||
|
||||
protected Consumer<MockFlowFile> stringOutputHandler(String expected) {
|
||||
return mockFlowFile -> mockFlowFile.assertContentEquals(expected);
|
||||
}
|
||||
|
||||
protected String getContent(String postfix) {
|
||||
return new String(getByteContent(postfix));
|
||||
}
|
||||
|
||||
protected byte[] getByteContent(String postfix) {
|
||||
try {
|
||||
return Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string." + postfix));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Map<String, Object>> getRecords(byte[] avroData) throws IOException, MalformedRecordException {
|
||||
try (RecordReader reader = new AvroReaderWithEmbeddedSchema(new ByteArrayInputStream(avroData));) {
|
||||
return getRecords(reader);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Map<String, Object>> getRecords(RecordReader reader) throws IOException, MalformedRecordException {
|
||||
List<Map<String, Object>> records = new ArrayList<>();
|
||||
|
||||
Record record;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
records.add(record.toMap());
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
protected void testChain(RecordSetWriterFactory writer2, RecordReaderFactory reader2) throws InitializationException {
|
||||
testConversion(reader, readerConfigurer, writer2, null,
|
||||
inputHandler,
|
||||
mockFlowFile -> {
|
||||
try {
|
||||
testConversion(reader2, null, writer, writerConfigurer,
|
||||
testRunner -> testRunner.enqueue(mockFlowFile),
|
||||
resultHandler
|
||||
);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected <R extends RecordReaderFactory, W extends RecordSetWriterFactory> void testConversion(
|
||||
R reader,
|
||||
Consumer<TestRunner> readerConfigurer,
|
||||
W writer,
|
||||
Consumer<TestRunner> writerConfigurer,
|
||||
Consumer<TestRunner> inputHandler,
|
||||
Consumer<MockFlowFile> resultHandler
|
||||
) throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
|
||||
|
||||
String readerId = UUID.randomUUID().toString();
|
||||
String writerId = UUID.randomUUID().toString();
|
||||
|
||||
runner.addControllerService(readerId, reader);
|
||||
runner.addControllerService(writerId, writer);
|
||||
|
||||
Optional.ofNullable(readerConfigurer).ifPresent(_configurer -> _configurer.accept(runner));
|
||||
Optional.ofNullable(writerConfigurer).ifPresent(_configurer -> _configurer.accept(runner));
|
||||
|
||||
runner.enableControllerService(reader);
|
||||
runner.enableControllerService(writer);
|
||||
|
||||
runner.setProperty(ConvertRecord.RECORD_READER, readerId);
|
||||
runner.setProperty(ConvertRecord.RECORD_WRITER, writerId);
|
||||
|
||||
inputHandler.accept(runner);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
|
||||
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
|
||||
|
||||
resultHandler.accept(flowFile);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.avro.AvroReaderWithExplicitSchema;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ConversionWithExplicitSchemaIT extends AbstractConversionIT {
|
||||
private String schema;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConversions/explicit.schema.json")));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String csvPostfix() {
|
||||
return "without_header.csv";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String jsonPostfix() {
|
||||
return "json";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String avroPostfix() {
|
||||
return "without_schema.avro";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String xmlPostfix() {
|
||||
return "xml";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commonReaderConfiguration(TestRunner testRunner) {
|
||||
testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commonWriterConfiguration(TestRunner testRunner) {
|
||||
testRunner.setProperty(writer, "Schema Write Strategy", "no-schema");
|
||||
testRunner.setProperty(writer, CSVUtils.INCLUDE_HEADER_LINE, "false");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Map<String, Object>> getRecords(byte[] avroData) throws IOException, MalformedRecordException {
|
||||
Schema avroSchema = new Schema.Parser().parse(schema);
|
||||
RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
|
||||
|
||||
try (RecordReader reader = new AvroReaderWithExplicitSchema(new ByteArrayInputStream(avroData), recordSchema, avroSchema);) {
|
||||
return getRecords(reader);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
public class ConversionWithSchemaInferenceIT extends AbstractConversionIT {
|
||||
@Override
|
||||
protected String csvPostfix() {
|
||||
return "with_header.csv";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String jsonPostfix() {
|
||||
return "json";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String avroPostfix() {
|
||||
return "with_schema.avro";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String xmlPostfix() {
|
||||
return "xml";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testJsonToAvro() throws Exception {
|
||||
fromJson(jsonPostfix());
|
||||
|
||||
// JSON schema inference doesn't discern INT and FLOAT but uses LONG and DOUBLE instead.
|
||||
// So the expected avro is a little bit different as the deserialized values also end up in
|
||||
// Long and Double objects
|
||||
toAvro("with_schema.json.to.avro");
|
||||
|
||||
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
[ {
|
||||
"Id" : 1,
|
||||
"Int_Float_String" : 3
|
||||
}, {
|
||||
"Id" : 2,
|
||||
"Int_Float_String" : 3.75
|
||||
}, {
|
||||
"Id" : 3,
|
||||
"Int_Float_String" : 3.85
|
||||
}, {
|
||||
"Id" : 4,
|
||||
"Int_Float_String" : 8
|
||||
}, {
|
||||
"Id" : 5,
|
||||
"Int_Float_String" : 2.0
|
||||
}, {
|
||||
"Id" : 6,
|
||||
"Int_Float_String" : 4.0
|
||||
}, {
|
||||
"Id" : 7,
|
||||
"Int_Float_String" : "some_string"
|
||||
} ]
|
|
@ -0,0 +1,8 @@
|
|||
Id,Int_Float_String
|
||||
1,3
|
||||
2,3.75
|
||||
3,3.85
|
||||
4,8
|
||||
5,2.0
|
||||
6,4.0
|
||||
7,some_string
|
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,7 @@
|
|||
1,3
|
||||
2,3.75
|
||||
3,3.85
|
||||
4,8
|
||||
5,2.0
|
||||
6,4.0
|
||||
7,some_string
|
|
Binary file not shown.
|
@ -0,0 +1,31 @@
|
|||
<?xml version="1.0" ?>
|
||||
<root>
|
||||
<nifiRecord>
|
||||
<Id>1</Id>
|
||||
<Int_Float_String>3</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>2</Id>
|
||||
<Int_Float_String>3.75</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>3</Id>
|
||||
<Int_Float_String>3.85</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>4</Id>
|
||||
<Int_Float_String>8</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>5</Id>
|
||||
<Int_Float_String>2.0</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>6</Id>
|
||||
<Int_Float_String>4.0</Int_Float_String>
|
||||
</nifiRecord>
|
||||
<nifiRecord>
|
||||
<Id>7</Id>
|
||||
<Int_Float_String>some_string</Int_Float_String>
|
||||
</nifiRecord>
|
||||
</root>
|
|
@ -0,0 +1,23 @@
|
|||
{
|
||||
"type":"record",
|
||||
"name":"nifiRecord",
|
||||
"namespace":"org.apache.nifi",
|
||||
"fields":[
|
||||
{
|
||||
"name":"Id",
|
||||
"type":[
|
||||
"null",
|
||||
"int"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name":"Int_Float_String",
|
||||
"type":[
|
||||
"int",
|
||||
"float",
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
|
|||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
public class FieldTypeInference {
|
||||
|
@ -33,7 +34,7 @@ public class FieldTypeInference {
|
|||
// unique value for the data type, and so this paradigm allows us to avoid the cost of creating
|
||||
// and using the HashSet.
|
||||
private DataType singleDataType = null;
|
||||
private Set<DataType> possibleDataTypes;
|
||||
private Set<DataType> possibleDataTypes = new HashSet<>();
|
||||
|
||||
public void addPossibleDataType(final DataType dataType) {
|
||||
if (dataType == null) {
|
||||
|
@ -45,7 +46,7 @@ public class FieldTypeInference {
|
|||
return;
|
||||
}
|
||||
|
||||
if (possibleDataTypes == null && singleDataType.equals(dataType)) {
|
||||
if (singleDataType.equals(dataType) || possibleDataTypes.contains(dataType)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -62,36 +63,42 @@ public class FieldTypeInference {
|
|||
final RecordSchema newSchema = ((RecordDataType) dataType).getChildSchema();
|
||||
|
||||
final RecordSchema mergedSchema = DataTypeUtils.merge(singleDataTypeSchema, newSchema);
|
||||
possibleDataTypes.remove(singleDataType);
|
||||
singleDataType = RecordFieldType.RECORD.getRecordDataType(mergedSchema);
|
||||
return;
|
||||
}
|
||||
|
||||
if (singleFieldType.isWiderThan(additionalFieldType)) {
|
||||
// Assigned type is already wide enough to encompass the given type
|
||||
return;
|
||||
}
|
||||
|
||||
if (additionalFieldType.isWiderThan(singleFieldType)) {
|
||||
// The given type is wide enough to encompass the assigned type. So changed the assigned type to the given type.
|
||||
singleDataType = dataType;
|
||||
return;
|
||||
}
|
||||
|
||||
if (possibleDataTypes == null) {
|
||||
possibleDataTypes = new HashSet<>();
|
||||
possibleDataTypes.add(singleDataType);
|
||||
return;
|
||||
}
|
||||
|
||||
if (possibleDataTypes.isEmpty()) {
|
||||
possibleDataTypes.add(singleDataType);
|
||||
}
|
||||
|
||||
for (DataType possibleDataType : possibleDataTypes) {
|
||||
RecordFieldType possibleFieldType = possibleDataType.getFieldType();
|
||||
if (!possibleFieldType.equals(RecordFieldType.STRING) && possibleFieldType.isWiderThan(additionalFieldType)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Iterator<DataType> possibleDataTypeIterator = possibleDataTypes.iterator();
|
||||
while (possibleDataTypeIterator.hasNext()) {
|
||||
DataType possibleDataType = possibleDataTypeIterator.next();
|
||||
RecordFieldType possibleFieldType = possibleDataType.getFieldType();
|
||||
|
||||
if (!additionalFieldType.equals(RecordFieldType.STRING) && additionalFieldType.isWiderThan(possibleFieldType)) {
|
||||
possibleDataTypeIterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
possibleDataTypes.add(dataType);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a single DataType that represents the field
|
||||
* @return a single DataType that represents the field
|
||||
*/
|
||||
public DataType toDataType() {
|
||||
if (possibleDataTypes == null) {
|
||||
if (possibleDataTypes.isEmpty()) {
|
||||
if (singleDataType == null) {
|
||||
return DEFAULT_DATA_TYPE;
|
||||
}
|
||||
|
|
|
@ -71,7 +71,13 @@ public class TestCSVSchemaInference {
|
|||
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), schema.getDataType("timestamp").get());
|
||||
assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), schema.getDataType("eventTime").get());
|
||||
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("eventDate").get());
|
||||
assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("maybeTime").get());
|
||||
assertEquals(
|
||||
RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.TIME.getDataType("HH:mm:ss"),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
),
|
||||
schema.getDataType("maybeTime").get()
|
||||
);
|
||||
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("maybeDate").get());
|
||||
|
||||
assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType());
|
||||
|
@ -118,7 +124,13 @@ public class TestCSVSchemaInference {
|
|||
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), schema.getDataType("timestamp").get());
|
||||
assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), schema.getDataType("eventTime").get());
|
||||
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("eventDate").get());
|
||||
assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("maybeTime").get());
|
||||
assertEquals(
|
||||
RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.TIME.getDataType("HH:mm:ss"),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
),
|
||||
schema.getDataType("maybeTime").get()
|
||||
);
|
||||
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("maybeDate").get());
|
||||
|
||||
assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType());
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TestWriteCSVResult {
|
|||
|
||||
final String values = splits[1];
|
||||
final StringBuilder expectedBuilder = new StringBuilder();
|
||||
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + dateValue + "\",\"" + timeValue + "\",\"" + timestampValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
|
||||
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
|
||||
|
||||
final String expectedValues = expectedBuilder.toString();
|
||||
|
||||
|
|
|
@ -153,7 +153,10 @@ public class TestInferJsonSchemaAccessStrategy {
|
|||
|
||||
// TIME value and a STRING should be inferred as a STRING field
|
||||
final RecordField maybeTimeField = schema.getField("maybeTime").get();
|
||||
assertEquals(RecordFieldType.STRING, maybeTimeField.getDataType().getFieldType());
|
||||
assertEquals(
|
||||
RecordFieldType.CHOICE.getChoiceDataType().getFieldType(),
|
||||
maybeTimeField.getDataType().getFieldType())
|
||||
;
|
||||
|
||||
// DATE value and a null value should be inferred as a DATE field
|
||||
final RecordField maybeDateField = schema.getField("maybeDate").get();
|
||||
|
@ -169,7 +172,7 @@ public class TestInferJsonSchemaAccessStrategy {
|
|||
final RecordSchema schema = inferSchema(file);
|
||||
|
||||
assertSame(RecordFieldType.STRING, schema.getDataType("name").get().getFieldType());
|
||||
assertSame(RecordFieldType.STRING, schema.getDataType("age").get().getFieldType());
|
||||
assertSame(RecordFieldType.CHOICE, schema.getDataType("age").get().getFieldType());
|
||||
|
||||
final DataType valuesDataType = schema.getDataType("values").get();
|
||||
assertSame(RecordFieldType.CHOICE, valuesDataType.getFieldType());
|
||||
|
@ -178,7 +181,10 @@ public class TestInferJsonSchemaAccessStrategy {
|
|||
final List<DataType> possibleTypes = valuesChoiceType.getPossibleSubTypes();
|
||||
assertEquals(2, possibleTypes.size());
|
||||
assertTrue(possibleTypes.contains(RecordFieldType.STRING.getDataType()));
|
||||
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
|
||||
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.LONG.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
))));
|
||||
|
||||
assertSame(RecordFieldType.STRING, schema.getDataType("nullValue").get().getFieldType());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.inference;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static com.google.common.collect.Collections2.permutations;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestFieldTypeInference {
|
||||
private FieldTypeInference testSubject;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
testSubject = new FieldTypeInference();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWith_SHORT_INT_LONG_shouldReturn_LONG() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.SHORT.getDataType(),
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.LONG.getDataType()
|
||||
);
|
||||
|
||||
DataType expected = RecordFieldType.LONG.getDataType();
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWith_INT_FLOAT_ShouldReturn_INT_FLOAT() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType()
|
||||
);
|
||||
|
||||
Set<DataType> expected = Sets.newHashSet(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType()
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWith_INT_STRING_shouldReturn_INT_STRING() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
|
||||
|
||||
Set<DataType> expected = Sets.newHashSet(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWith_INT_FLOAT_STRING_shouldReturn_INT_FLOAT_STRING() {
|
||||
// GIVEN
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
|
||||
Set<DataType> expected = Sets.newHashSet(
|
||||
RecordFieldType.INT.getDataType(),
|
||||
RecordFieldType.FLOAT.getDataType(),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToDataTypeWithMultipleRecord() {
|
||||
// GIVEN
|
||||
String fieldName = "fieldName";
|
||||
DataType fieldType1 = RecordFieldType.INT.getDataType();
|
||||
DataType fieldType2 = RecordFieldType.FLOAT.getDataType();
|
||||
DataType fieldType3 = RecordFieldType.STRING.getDataType();
|
||||
|
||||
List<DataType> dataTypes = Arrays.asList(
|
||||
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType1)),
|
||||
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType2)),
|
||||
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType3)),
|
||||
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType2))
|
||||
);
|
||||
|
||||
DataType expected = RecordFieldType.RECORD.getRecordDataType(createRecordSchema(
|
||||
fieldName,
|
||||
RecordFieldType.CHOICE.getChoiceDataType(
|
||||
fieldType1,
|
||||
fieldType2,
|
||||
fieldType3
|
||||
)
|
||||
));
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
|
||||
}
|
||||
|
||||
private SimpleRecordSchema createRecordSchema(String fieldName, DataType fieldType) {
|
||||
return new SimpleRecordSchema(Arrays.asList(
|
||||
new RecordField(fieldName, fieldType)
|
||||
));
|
||||
}
|
||||
|
||||
private <I, E> void runWithAllPermutations(BiFunction<List<I>, E, ?> test, List<I> input, E expected) {
|
||||
permutations(input).forEach(inputPermutation -> test.apply(inputPermutation, expected));
|
||||
}
|
||||
|
||||
private Void testToDataTypeShouldReturnChoice(List<DataType> dataTypes, Set<DataType> expected) {
|
||||
// GIVEN
|
||||
dataTypes.forEach(testSubject::addPossibleDataType);
|
||||
|
||||
// WHEN
|
||||
DataType actual = testSubject.toDataType();
|
||||
|
||||
// THEN
|
||||
assertEquals(expected, new HashSet<>(((ChoiceDataType) actual).getPossibleSubTypes()));
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private Void testToDataTypeShouldReturnSingleType(List<DataType> dataTypes, DataType expected) {
|
||||
// GIVEN
|
||||
dataTypes.forEach(testSubject::addPossibleDataType);
|
||||
|
||||
// WHEN
|
||||
DataType actual = testSubject.toDataType();
|
||||
|
||||
// THEN
|
||||
assertEquals(expected, actual);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -85,7 +85,13 @@ public class TestInferXmlSchema {
|
|||
assertSame(RecordFieldType.STRING, schema.getDataType("COUNTRY").get().getFieldType());
|
||||
|
||||
assertEquals(RecordFieldType.DATE.getDataType(timeValueInference.getDateFormat()), schema.getDataType("DOB").get());
|
||||
assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("TOB").get());
|
||||
assertEquals(
|
||||
RecordFieldType.CHOICE.getChoiceDataType(
|
||||
RecordFieldType.TIME.getDataType("HH:mm:ss"),
|
||||
RecordFieldType.STRING.getDataType()
|
||||
),
|
||||
schema.getDataType("TOB").get()
|
||||
);
|
||||
assertEquals(RecordFieldType.TIMESTAMP.getDataType(timeValueInference.getTimestampFormat()), schema.getDataType("TSOB").get());
|
||||
|
||||
final DataType addressDataType = schema.getDataType("ADDRESS").get();
|
||||
|
|
|
@ -7,9 +7,9 @@
|
|||
"bigint" : 8,
|
||||
"float" : 8.0,
|
||||
"double" : 8.0,
|
||||
"timestamp" : "2017-01-01 17:00:00",
|
||||
"date" : "2017-01-01",
|
||||
"time" : "17:00:00",
|
||||
"timestamp" : "2017-01-01 17:00:00",
|
||||
"char" : "c",
|
||||
"string" : "string",
|
||||
"record" : null,
|
||||
|
|
Loading…
Reference in New Issue