From 34112519c2dde19d704ef624e62e51b399cf1ce7 Mon Sep 17 00:00:00 2001 From: Tamas Palfy Date: Fri, 13 Sep 2019 16:39:24 +0200 Subject: [PATCH] NIFI-6640 - UNION/CHOICE types not handled correctly 3 important changes: 1. FieldTypeInference had a bug when dealing with multiple datatypes for the same field where some (but not all) were in a wider-than-the-other relationship. Before: Some datatypes could be lost. String was wider than any other. After: Consistent behaviour. String is NOT wider than any other. 2. Choosing a datatype for a value from a ChoiceDataType: Before it chose the first compatible datatype as the basis of conversion. After change it tries to find the most suitable datatype. 3. Conversion of a value of avro union type: Before it chose the first compatible datatype as the basis of conversion. After change it tries to find the most suitable datatype. Change: In the RecordFieldType enum moved TIMESTAMP ahead of DATE. This closes #3724. Signed-off-by: Mark Payne --- .../serialization/record/RecordFieldType.java | 10 +- .../record/util/DataTypeUtils.java | 93 ++++- .../record/TestDataTypeUtils.java | 224 ++++++++++ .../org/apache/nifi/avro/AvroTypeUtil.java | 11 + .../apache/nifi/avro/TestAvroTypeUtil.java | 53 +++ .../nifi-standard-processors/pom.xml | 8 + .../standard/AbstractConversionIT.java | 388 ++++++++++++++++++ .../ConversionWithExplicitSchemaIT.java | 88 ++++ .../ConversionWithSchemaInferenceIT.java | 51 +++ .../data.int_float_string.json | 22 + .../data.int_float_string.with_header.csv | 8 + .../data.int_float_string.with_schema.avro | Bin 0 -> 302 bytes ....int_float_string.with_schema.json.to.avro | Bin 0 -> 322 bytes .../data.int_float_string.without_header.csv | 7 + .../data.int_float_string.without_schema.avro | Bin 0 -> 51 bytes .../TestConversions/data.int_float_string.xml | 31 ++ .../TestConversions/explicit.schema.json | 23 ++ .../schema/inference/FieldTypeInference.java | 47 ++- .../nifi/csv/TestCSVSchemaInference.java | 16 +- .../apache/nifi/csv/TestWriteCSVResult.java | 2 +- .../TestInferJsonSchemaAccessStrategy.java | 12 +- .../inference/TestFieldTypeInference.java | 182 ++++++++ .../apache/nifi/xml/TestInferXmlSchema.java | 8 +- .../test/resources/json/output/dataTypes.json | 2 +- 24 files changed, 1240 insertions(+), 46 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java index de9aa58a42..413c1281ee 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java @@ -72,6 +72,11 @@ public enum RecordFieldType { */ DOUBLE("double", FLOAT), + /** + * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value. + */ + TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"), + /** * A date field type. Fields of this type use a {@code java.sql.Date} value. */ @@ -82,11 +87,6 @@ public enum RecordFieldType { */ TIME("time", "HH:mm:ss"), - /** - * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value. - */ - TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"), - /** * A char field type. Fields of this type use a {@code char} value. */ diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 0686dcf872..308cafa5ca 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; import java.io.Reader; +import java.lang.reflect.Array; import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -47,16 +48,21 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.TimeZone; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -225,17 +231,75 @@ public class DataTypeUtils { } public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) { - for (final DataType subType : choiceType.getPossibleSubTypes()) { - if (isCompatibleDataType(value, subType)) { - if (subType.getFieldType() == RecordFieldType.CHOICE) { - return chooseDataType(value, (ChoiceDataType) subType); - } + Queue possibleSubTypes = new LinkedList<>(choiceType.getPossibleSubTypes()); + List compatibleSimpleSubTypes = new ArrayList<>(); - return subType; + DataType subType; + while ((subType = possibleSubTypes.poll()) != null) { + if (subType instanceof ChoiceDataType) { + possibleSubTypes.addAll(((ChoiceDataType) subType).getPossibleSubTypes()); + } else { + if (isCompatibleDataType(value, subType)) { + compatibleSimpleSubTypes.add(subType); + } } } - return null; + int nrOfCompatibleSimpleSubTypes = compatibleSimpleSubTypes.size(); + + final DataType chosenSimpleType; + if (nrOfCompatibleSimpleSubTypes == 0) { + chosenSimpleType = null; + } else if (nrOfCompatibleSimpleSubTypes == 1) { + chosenSimpleType = compatibleSimpleSubTypes.get(0); + } else { + chosenSimpleType = findMostSuitableType(value, compatibleSimpleSubTypes, Function.identity()) + .orElse(compatibleSimpleSubTypes.get(0)); + } + + return chosenSimpleType; + } + + public static Optional findMostSuitableType(Object value, List types, Function dataTypeMapper) { + if (value instanceof String) { + return findMostSuitableTypeByStringValue((String) value, types, dataTypeMapper); + } else { + DataType inferredDataType = inferDataType(value, null); + + if (inferredDataType != null && !inferredDataType.getFieldType().equals(RecordFieldType.STRING)) { + for (T type : types) { + if (inferredDataType.equals(dataTypeMapper.apply(type))) { + return Optional.of(type); + } + } + + for (T type : types) { + if (getWiderType(dataTypeMapper.apply(type), inferredDataType).isPresent()) { + return Optional.of(type); + } + } + } + } + + return Optional.empty(); + } + + public static Optional findMostSuitableTypeByStringValue(String valueAsString, List types, Function dataTypeMapper) { + // Sorting based on the RecordFieldType enum ordering looks appropriate here as we want simpler types + // first and the enum's ordering seems to reflect that + Collections.sort(types, Comparator.comparing(type -> dataTypeMapper.apply(type).getFieldType())); + + for (T type : types) { + try { + if (isCompatibleDataType(valueAsString, dataTypeMapper.apply(type))) { + return Optional.of(type); + } + } catch (Exception e) { + logger.error("Exception thrown while checking if '" + valueAsString + "' is compatible with '" + type + "'", e); + } + } + + return Optional.empty(); } public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) { @@ -440,12 +504,12 @@ public class DataTypeUtils { // final DataType elementDataType = inferDataType(valueFromMap, RecordFieldType.STRING.getDataType()); // return RecordFieldType.MAP.getMapDataType(elementDataType); } - if (value instanceof Object[]) { - final Object[] array = (Object[]) value; - + if (value.getClass().isArray()) { DataType mergedDataType = null; - for (final Object arrayValue : array) { - final DataType inferredDataType = inferDataType(arrayValue, RecordFieldType.STRING.getDataType()); + + int length = Array.getLength(value); + for(int index = 0; index < length; index++) { + final DataType inferredDataType = inferDataType(Array.get(value, index), RecordFieldType.STRING.getDataType()); mergedDataType = mergeDataTypes(mergedDataType, inferredDataType); } @@ -1545,7 +1609,10 @@ public class DataTypeUtils { possibleTypes.add(otherDataType); } - return RecordFieldType.CHOICE.getChoiceDataType(new ArrayList<>(possibleTypes)); + ArrayList possibleChildTypes = new ArrayList<>(possibleTypes); + Collections.sort(possibleChildTypes, Comparator.comparing(DataType::getFieldType)); + + return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); } } diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 89a0490f58..30b2a60ab3 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -18,6 +18,7 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.junit.Test; @@ -27,10 +28,17 @@ import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -324,4 +332,220 @@ public class TestDataTypeUtils { } assertNotNull(e); } + + @Test + public void testFindMostSuitableTypeByStringValueShouldReturnEvenWhenOneTypeThrowsException() { + String valueAsString = "value"; + + String nonMatchingType = "nonMatchingType"; + String throwsExceptionType = "throwsExceptionType"; + String matchingType = "matchingType"; + + List types = Arrays.asList( + nonMatchingType, + throwsExceptionType, + matchingType + ); + Optional expected = Optional.of(matchingType); + + AtomicBoolean exceptionThrown = new AtomicBoolean(false); + + Function dataTypeMapper = type -> { + if (type.equals(nonMatchingType)) { + return RecordFieldType.BOOLEAN.getDataType(); + } else if (type.equals(throwsExceptionType)) { + return new DataType(RecordFieldType.DATE, null) { + @Override + public String getFormat() { + exceptionThrown.set(true); + throw new RuntimeException("maching error"); + } + }; + } else if (type.equals(matchingType)) { + return RecordFieldType.STRING.getDataType(); + } + + return null; + }; + + Optional actual = DataTypeUtils.findMostSuitableTypeByStringValue(valueAsString, types, dataTypeMapper); + assertTrue("Exception not thrown during test as intended.", exceptionThrown.get()); + assertEquals(expected, actual); + } + + @Test + public void testChooseDataTypeWhenInt_vs_INT_FLOAT_ThenShouldReturnINT() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType() + ); + + Object value = 1; + DataType expected = RecordFieldType.INT.getDataType(); + + // WHEN + // THEN + testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected); + } + + @Test + public void testChooseDataTypeWhenFloat_vs_INT_FLOAT_ThenShouldReturnFLOAT() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType() + ); + + Object value = 1.5f; + DataType expected = RecordFieldType.FLOAT.getDataType(); + + // WHEN + // THEN + testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected); + } + + @Test + public void testChooseDataTypeWhenHasChoiceThenShouldReturnSingleMatchingFromChoice() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.DOUBLE.getDataType(), + RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.FLOAT.getDataType(), + RecordFieldType.STRING.getDataType() + ) + ); + + Object value = 1.5f; + DataType expected = RecordFieldType.FLOAT.getDataType(); + + // WHEN + // THEN + testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected); + } + + private void testChooseDataTypeAlsoReverseTypes(Object value, List dataTypes, DataType expected) { + testChooseDataType(dataTypes, value, expected); + Collections.reverse(dataTypes); + testChooseDataType(dataTypes, value, expected); + } + + private void testChooseDataType(List dataTypes, Object value, DataType expected) { + // GIVEN + ChoiceDataType choiceDataType = (ChoiceDataType) RecordFieldType.CHOICE.getChoiceDataType(dataTypes.toArray(new DataType[dataTypes.size()])); + + // WHEN + DataType actual = DataTypeUtils.chooseDataType(value, choiceDataType); + + // THEN + assertEquals(expected, actual); + } + + @Test + public void testFindMostSuitableTypeWithBoolean() { + testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithByte() { + testFindMostSuitableType(Byte.valueOf((byte) 123), RecordFieldType.BYTE.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithShort() { + testFindMostSuitableType(Short.valueOf((short) 123), RecordFieldType.SHORT.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithInt() { + testFindMostSuitableType(123, RecordFieldType.INT.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithLong() { + testFindMostSuitableType(123L, RecordFieldType.LONG.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithBigInt() { + testFindMostSuitableType(BigInteger.valueOf(123L), RecordFieldType.BIGINT.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithFloat() { + testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithDouble() { + testFindMostSuitableType(12.3, RecordFieldType.DOUBLE.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithDate() { + testFindMostSuitableType("1111-11-11", RecordFieldType.DATE.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithTime() { + testFindMostSuitableType("11:22:33", RecordFieldType.TIME.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithTimeStamp() { + testFindMostSuitableType("1111-11-11 11:22:33", RecordFieldType.TIMESTAMP.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithChar() { + testFindMostSuitableType('a', RecordFieldType.CHAR.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithStringShouldReturnChar() { + testFindMostSuitableType("abc", RecordFieldType.CHAR.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithString() { + testFindMostSuitableType("abc", RecordFieldType.STRING.getDataType(), RecordFieldType.CHAR.getDataType()); + } + + @Test + public void testFindMostSuitableTypeWithArray() { + testFindMostSuitableType(new int[]{1, 2, 3}, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())); + } + + private void testFindMostSuitableType(Object value, DataType expected, DataType... filtered) { + List filteredOutDataTypes = Arrays.stream(filtered).collect(Collectors.toList()); + + // GIVEN + List unexpectedTypes = Arrays.stream(RecordFieldType.values()) + .flatMap(recordFieldType -> { + Stream dataTypeStream; + + if (RecordFieldType.ARRAY.equals(recordFieldType)) { + dataTypeStream = Arrays.stream(RecordFieldType.values()).map(elementType -> RecordFieldType.ARRAY.getArrayDataType(elementType.getDataType())); + } else { + dataTypeStream = Stream.of(recordFieldType.getDataType()); + } + + return dataTypeStream; + }) + .filter(dataType -> !dataType.equals(expected)) + .filter(dataType -> !filteredOutDataTypes.contains(dataType)) + .collect(Collectors.toList()); + + IntStream.rangeClosed(0, unexpectedTypes.size()).forEach(insertIndex -> { + List allTypes = new LinkedList<>(unexpectedTypes); + allTypes.add(insertIndex, expected); + + // WHEN + Optional actual = DataTypeUtils.findMostSuitableType(value, allTypes, Function.identity()); + + // THEN + assertEquals(Optional.ofNullable(expected), actual); + }); + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 2bc95bc5c3..a0eea8b1c4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -74,6 +74,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; public class AvroTypeUtil { private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); @@ -877,6 +878,16 @@ public class AvroTypeUtil { */ private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function conversion, final String fieldName) { boolean foundNonNull = false; + + Optional mostSuitableType = DataTypeUtils.findMostSuitableType( + originalValue, + fieldSchema.getTypes().stream().filter(schema -> schema.getType() != Type.NULL).collect(Collectors.toList()), + subSchema -> AvroTypeUtil.determineDataType(subSchema) + ); + if (mostSuitableType.isPresent()) { + return conversion.apply(mostSuitableType.get()); + } + for (final Schema subSchema : fieldSchema.getTypes()) { if (subSchema.getType() == Type.NULL) { continue; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java index 0ecbe25257..a89ebe479c 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -587,4 +588,56 @@ public class TestAvroTypeUtil { assertNotNull(((Record)inner).get("Message")); } } + + @Test + public void testConvertToAvroObjectWhenIntVSUnion_INT_FLOAT_ThenReturnInt() { + // GIVEN + List schemaTypes = Arrays.asList( + Schema.Type.INT, + Schema.Type.FLOAT + ); + Integer rawValue = 1; + + Object expected = 1; + + // WHEN + // THEN + testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, schemaTypes); + } + + @Test + public void testConvertToAvroObjectWhenFloatVSUnion_INT_FLOAT_ThenReturnFloat() { + // GIVEN + List schemaTypes = Arrays.asList( + Schema.Type.INT, + Schema.Type.FLOAT + ); + Float rawValue = 1.5f; + + Object expected = 1.5f; + + // WHEN + // THEN + testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, schemaTypes); + } + + private void testConvertToAvroObjectAlsoReverseSchemaList(Object expected, Object rawValue, List schemaTypes) { + // GIVEN + List schemaList = schemaTypes.stream() + .map(Schema::create) + .collect(Collectors.toList()); + + // WHEN + Object actual = AvroTypeUtil.convertToAvroObject(rawValue, Schema.createUnion(schemaList), StandardCharsets.UTF_16); + + // THEN + assertEquals(expected, actual); + + // WHEN + Collections.reverse(schemaList); + Object actualAfterReverse = AvroTypeUtil.convertToAvroObject(rawValue, Schema.createUnion(schemaList), StandardCharsets.UTF_16); + + // THEN + assertEquals(expected, actualAfterReverse); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 0b15be05fe..ed51c40805 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -411,6 +411,14 @@ src/test/resources/TestMergeContent/head src/test/resources/TestMergeContent/user.avsc src/test/resources/TestMergeContent/place.avsc + src/test/resources/TestConversions/data.int_float_string.json + src/test/resources/TestConversions/data.int_float_string.with_header.csv + src/test/resources/TestConversions/data.int_float_string.without_header.csv + src/test/resources/TestConversions/data.int_float_string.xml + src/test/resources/TestConversions/data.int_float_string.with_schema.avro + src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro + src/test/resources/TestConversions/data.int_float_string.without_schema.avro + src/test/resources/TestConversions/explicit.schema.json src/test/resources/TestConvertJSONToSQL/person-1.json src/test/resources/TestConvertJSONToSQL/persons.json src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java new file mode 100644 index 0000000000..9b458a228d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.avro.AvroReader; +import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema; +import org.apache.nifi.avro.AvroRecordSetWriter; +import org.apache.nifi.csv.CSVReader; +import org.apache.nifi.csv.CSVRecordSetWriter; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.xml.XMLReader; +import org.apache.nifi.xml.XMLRecordSetWriter; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; + +public abstract class AbstractConversionIT { + protected RecordReaderFactory reader; + protected Consumer inputHandler; + protected Consumer readerConfigurer; + + protected RecordSetWriterFactory writer; + protected Consumer resultHandler; + protected Consumer writerConfigurer; + + @Before + public void setUp() throws Exception { + reader = null; + inputHandler = null; + readerConfigurer = null; + + writer = null; + resultHandler = null; + writerConfigurer = null; + } + + @Test + public void testCsvToJson() throws Exception { + fromCsv(csvPostfix()); + toJson(jsonPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testCsvToAvro() throws Exception { + fromCsv(csvPostfix()); + toAvro(avroPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testCsvToAvroToCsv() throws Exception { + fromCsv(csvPostfix()); + + AvroRecordSetWriter writer2 = new AvroRecordSetWriter(); + AvroReader reader2 = new AvroReader(); + + toCsv(csvPostfix()); + + testChain(writer2, reader2); + } + + @Test + public void testCsvToXml() throws Exception { + fromCsv(csvPostfix()); + toXml(xmlPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testJsonToCsv() throws Exception { + fromJson(jsonPostfix()); + toCsv(csvPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testJsonToAvro() throws Exception { + fromJson(jsonPostfix()); + toAvro(avroPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testJsonToAvroToJson() throws Exception { + fromJson(jsonPostfix()); + + AvroRecordSetWriter writer2 = new AvroRecordSetWriter(); + AvroReader reader2 = new AvroReader(); + + toJson(jsonPostfix()); + + testChain(writer2, reader2); + } + + @Test + public void testAvroToCsv() throws Exception { + fromAvro(avroPostfix()); + toCsv(csvPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testAvroToJson() throws Exception { + fromAvro(avroPostfix()); + toJson(jsonPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testAvroToXml() throws Exception { + fromAvro(avroPostfix()); + toXml(xmlPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testXmlToCsv() throws Exception { + fromXml(xmlPostfix()); + toCsv(csvPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testXmlToJson() throws Exception { + fromXml(xmlPostfix()); + toJson(jsonPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testXmlToAvro() throws Exception { + fromXml(xmlPostfix()); + toAvro(avroPostfix()); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } + + @Test + public void testXmlToAvroToXml() throws Exception { + fromXml(xmlPostfix()); + + AvroRecordSetWriter writer2 = new AvroRecordSetWriter(); + AvroReader reader2 = new AvroReader(); + + toXml(xmlPostfix()); + + testChain(writer2, reader2); + } + + abstract protected String csvPostfix(); + + abstract protected String jsonPostfix(); + + abstract protected String avroPostfix(); + + abstract protected String xmlPostfix(); + + protected void commonReaderConfiguration(TestRunner testRunner) { + } + + protected void commonWriterConfiguration(TestRunner testRunner) { + } + + protected void fromCsv(String postfix) { + reader = new CSVReader(); + inputHandler = stringInputHandler(getContent(postfix)); + + readerConfigurer = testRunner -> { + commonReaderConfiguration(testRunner); + }; + } + + protected void fromJson(String postfix) { + reader = new JsonTreeReader(); + inputHandler = stringInputHandler(getContent(postfix)); + + readerConfigurer = testRunner -> { + commonReaderConfiguration(testRunner); + }; + } + + protected void fromXml(String postfix) { + reader = new XMLReader(); + inputHandler = stringInputHandler(getContent(postfix)); + + readerConfigurer = testRunner -> { + commonReaderConfiguration(testRunner); + testRunner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY); + }; + } + + protected void fromAvro(String postfix) { + reader = new AvroReader(); + inputHandler = byteInputHandler(getByteContent(postfix)); + + readerConfigurer = testRunner -> { + commonReaderConfiguration(testRunner); + }; + } + + protected void toCsv(String postfix) { + writer = new CSVRecordSetWriter(); + resultHandler = stringOutputHandler(getContent(postfix)); + + writerConfigurer = testRunner -> { + commonWriterConfiguration(testRunner); + }; + } + + protected void toJson(String postfix) { + writer = new JsonRecordSetWriter(); + resultHandler = stringOutputHandler(getContent(postfix)); + + writerConfigurer = testRunner -> { + commonWriterConfiguration(testRunner); + testRunner.setProperty(writer, "Pretty Print JSON", "true"); + }; + } + + protected void toXml(String postfix) { + writer = new XMLRecordSetWriter(); + resultHandler = stringOutputHandler(getContent(postfix)); + + writerConfigurer = testRunner -> { + commonWriterConfiguration(testRunner); + testRunner.setProperty(writer, "pretty_print_xml", "true"); + testRunner.setProperty(writer, "root_tag_name", "root"); + testRunner.setProperty(writer, "record_tag_name", "nifiRecord"); + }; + } + + protected void toAvro(String postfix) { + writer = new AvroRecordSetWriter(); + resultHandler = mockFlowFile -> { + try { + List> expected = getRecords(getByteContent(postfix)); + List> actual = getRecords(mockFlowFile.toByteArray()); + + assertEquals(expected, actual); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + writerConfigurer = testRunner -> { + commonWriterConfiguration(testRunner); + }; + } + + protected Consumer stringInputHandler(String input) { + return testRunner -> testRunner.enqueue(input); + } + + protected Consumer byteInputHandler(byte[] input) { + return testRunner -> testRunner.enqueue(input); + } + + protected Consumer stringOutputHandler(String expected) { + return mockFlowFile -> mockFlowFile.assertContentEquals(expected); + } + + protected String getContent(String postfix) { + return new String(getByteContent(postfix)); + } + + protected byte[] getByteContent(String postfix) { + try { + return Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string." + postfix)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected List> getRecords(byte[] avroData) throws IOException, MalformedRecordException { + try (RecordReader reader = new AvroReaderWithEmbeddedSchema(new ByteArrayInputStream(avroData));) { + return getRecords(reader); + } + } + + protected List> getRecords(RecordReader reader) throws IOException, MalformedRecordException { + List> records = new ArrayList<>(); + + Record record; + while ((record = reader.nextRecord()) != null) { + records.add(record.toMap()); + } + + return records; + } + + protected void testChain(RecordSetWriterFactory writer2, RecordReaderFactory reader2) throws InitializationException { + testConversion(reader, readerConfigurer, writer2, null, + inputHandler, + mockFlowFile -> { + try { + testConversion(reader2, null, writer, writerConfigurer, + testRunner -> testRunner.enqueue(mockFlowFile), + resultHandler + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + protected void testConversion( + R reader, + Consumer readerConfigurer, + W writer, + Consumer writerConfigurer, + Consumer inputHandler, + Consumer resultHandler + ) throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + + String readerId = UUID.randomUUID().toString(); + String writerId = UUID.randomUUID().toString(); + + runner.addControllerService(readerId, reader); + runner.addControllerService(writerId, writer); + + Optional.ofNullable(readerConfigurer).ifPresent(_configurer -> _configurer.accept(runner)); + Optional.ofNullable(writerConfigurer).ifPresent(_configurer -> _configurer.accept(runner)); + + runner.enableControllerService(reader); + runner.enableControllerService(writer); + + runner.setProperty(ConvertRecord.RECORD_READER, readerId); + runner.setProperty(ConvertRecord.RECORD_WRITER, writerId); + + inputHandler.accept(runner); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + + resultHandler.accept(flowFile); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java new file mode 100644 index 0000000000..81467e33f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroReaderWithExplicitSchema; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.csv.CSVUtils; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.TestRunner; +import org.junit.Before; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +public class ConversionWithExplicitSchemaIT extends AbstractConversionIT { + private String schema; + + @Before + public void setUp() throws Exception { + super.setUp(); + + schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConversions/explicit.schema.json"))); + } + + @Override + protected String csvPostfix() { + return "without_header.csv"; + } + + @Override + protected String jsonPostfix() { + return "json"; + } + + @Override + protected String avroPostfix() { + return "without_schema.avro"; + } + + @Override + protected String xmlPostfix() { + return "xml"; + } + + @Override + protected void commonReaderConfiguration(TestRunner testRunner) { + testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, schema); + } + + @Override + protected void commonWriterConfiguration(TestRunner testRunner) { + testRunner.setProperty(writer, "Schema Write Strategy", "no-schema"); + testRunner.setProperty(writer, CSVUtils.INCLUDE_HEADER_LINE, "false"); + } + + @Override + protected List> getRecords(byte[] avroData) throws IOException, MalformedRecordException { + Schema avroSchema = new Schema.Parser().parse(schema); + RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + + try (RecordReader reader = new AvroReaderWithExplicitSchema(new ByteArrayInputStream(avroData), recordSchema, avroSchema);) { + return getRecords(reader); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java new file mode 100644 index 0000000000..bed820c24b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +public class ConversionWithSchemaInferenceIT extends AbstractConversionIT { + @Override + protected String csvPostfix() { + return "with_header.csv"; + } + + @Override + protected String jsonPostfix() { + return "json"; + } + + @Override + protected String avroPostfix() { + return "with_schema.avro"; + } + + @Override + protected String xmlPostfix() { + return "xml"; + } + + @Override + public void testJsonToAvro() throws Exception { + fromJson(jsonPostfix()); + + // JSON schema inference doesn't discern INT and FLOAT but uses LONG and DOUBLE instead. + // So the expected avro is a little bit different as the deserialized values also end up in + // Long and Double objects + toAvro("with_schema.json.to.avro"); + + testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json new file mode 100644 index 0000000000..1971dcfd55 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json @@ -0,0 +1,22 @@ +[ { + "Id" : 1, + "Int_Float_String" : 3 +}, { + "Id" : 2, + "Int_Float_String" : 3.75 +}, { + "Id" : 3, + "Int_Float_String" : 3.85 +}, { + "Id" : 4, + "Int_Float_String" : 8 +}, { + "Id" : 5, + "Int_Float_String" : 2.0 +}, { + "Id" : 6, + "Int_Float_String" : 4.0 +}, { + "Id" : 7, + "Int_Float_String" : "some_string" +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv new file mode 100644 index 0000000000..d9972884b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv @@ -0,0 +1,8 @@ +Id,Int_Float_String +1,3 +2,3.75 +3,3.85 +4,8 +5,2.0 +6,4.0 +7,some_string diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro new file mode 100644 index 0000000000000000000000000000000000000000..3c18077c1e68bedaf271e8f3c00333f27d13fe30 GIT binary patch literal 302 zcmeZI%3@>@ODrqO*DFrWNX<=r#Z;|SQdy9yWTjM;nw(#hqNJmgmzWFU=Vhj41|f?T z7bGTwB=U>W^%8;Xj8r|48laA}%+#EeVkN8SYM5qEkU*(%)An%*jgQg zn!J*Dx19XMlK9|~qRhN>Bpo27Kx09YK)M(v4K^sYHnvs-?1kj~l+ N^K(<@ODrqO*DFrWNXW^%8;Xj8r|48laA}%+#EeVkN8SYM5qEkU*(ocz3WrPx{> zgrdBXc(Vt&UPkerZxpDp0%_rT}bKY;A0<2-p|N`6;Q%93Vjk zxf9pr)uQ^Z9r>A|x-8)1IleYVCME_pCKe_JFyM1wVq-dc1_Xdi4h8`xE~pHH0}~IF UDd51w$0AmopPL#Fu@c=>09>kOc>n+a literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv new file mode 100644 index 0000000000..65cf3657fa --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv @@ -0,0 +1,7 @@ +1,3 +2,3.75 +3,3.85 +4,8 +5,2.0 +6,4.0 +7,some_string diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro new file mode 100644 index 0000000000000000000000000000000000000000..c800fe6c912583db76e27dcfe0fb161ecef1c966 GIT binary patch literal 51 zcmZQ#VqjxpVPaq?aA0C%N=qwqVB%m9VB!LbFgP&r0I3ECCO#Ii;{4py_~MeH%)E2} Djob)Z literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml new file mode 100644 index 0000000000..36a9b10bdc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml @@ -0,0 +1,31 @@ + + + + 1 + 3 + + + 2 + 3.75 + + + 3 + 3.85 + + + 4 + 8 + + + 5 + 2.0 + + + 6 + 4.0 + + + 7 + some_string + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json new file mode 100644 index 0000000000..5c578b06ec --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json @@ -0,0 +1,23 @@ +{ + "type":"record", + "name":"nifiRecord", + "namespace":"org.apache.nifi", + "fields":[ + { + "name":"Id", + "type":[ + "null", + "int" + ] + }, + { + "name":"Int_Float_String", + "type":[ + "int", + "float", + "string", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java index e148baf955..1f52cb8357 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java @@ -23,6 +23,7 @@ import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; public class FieldTypeInference { @@ -33,7 +34,7 @@ public class FieldTypeInference { // unique value for the data type, and so this paradigm allows us to avoid the cost of creating // and using the HashSet. private DataType singleDataType = null; - private Set possibleDataTypes; + private Set possibleDataTypes = new HashSet<>(); public void addPossibleDataType(final DataType dataType) { if (dataType == null) { @@ -45,7 +46,7 @@ public class FieldTypeInference { return; } - if (possibleDataTypes == null && singleDataType.equals(dataType)) { + if (singleDataType.equals(dataType) || possibleDataTypes.contains(dataType)) { return; } @@ -62,36 +63,42 @@ public class FieldTypeInference { final RecordSchema newSchema = ((RecordDataType) dataType).getChildSchema(); final RecordSchema mergedSchema = DataTypeUtils.merge(singleDataTypeSchema, newSchema); + possibleDataTypes.remove(singleDataType); singleDataType = RecordFieldType.RECORD.getRecordDataType(mergedSchema); - return; - } - - if (singleFieldType.isWiderThan(additionalFieldType)) { - // Assigned type is already wide enough to encompass the given type - return; - } - - if (additionalFieldType.isWiderThan(singleFieldType)) { - // The given type is wide enough to encompass the assigned type. So changed the assigned type to the given type. - singleDataType = dataType; - return; - } - - if (possibleDataTypes == null) { - possibleDataTypes = new HashSet<>(); possibleDataTypes.add(singleDataType); + return; + } + + if (possibleDataTypes.isEmpty()) { + possibleDataTypes.add(singleDataType); + } + + for (DataType possibleDataType : possibleDataTypes) { + RecordFieldType possibleFieldType = possibleDataType.getFieldType(); + if (!possibleFieldType.equals(RecordFieldType.STRING) && possibleFieldType.isWiderThan(additionalFieldType)) { + return; + } + } + + Iterator possibleDataTypeIterator = possibleDataTypes.iterator(); + while (possibleDataTypeIterator.hasNext()) { + DataType possibleDataType = possibleDataTypeIterator.next(); + RecordFieldType possibleFieldType = possibleDataType.getFieldType(); + + if (!additionalFieldType.equals(RecordFieldType.STRING) && additionalFieldType.isWiderThan(possibleFieldType)) { + possibleDataTypeIterator.remove(); + } } possibleDataTypes.add(dataType); } - /** * Creates a single DataType that represents the field * @return a single DataType that represents the field */ public DataType toDataType() { - if (possibleDataTypes == null) { + if (possibleDataTypes.isEmpty()) { if (singleDataType == null) { return DEFAULT_DATA_TYPE; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java index 9dc8f29ce4..b8d66851b4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java @@ -71,7 +71,13 @@ public class TestCSVSchemaInference { assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), schema.getDataType("timestamp").get()); assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), schema.getDataType("eventTime").get()); assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("eventDate").get()); - assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("maybeTime").get()); + assertEquals( + RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.TIME.getDataType("HH:mm:ss"), + RecordFieldType.STRING.getDataType() + ), + schema.getDataType("maybeTime").get() + ); assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("maybeDate").get()); assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType()); @@ -118,7 +124,13 @@ public class TestCSVSchemaInference { assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), schema.getDataType("timestamp").get()); assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), schema.getDataType("eventTime").get()); assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("eventDate").get()); - assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("maybeTime").get()); + assertEquals( + RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.TIME.getDataType("HH:mm:ss"), + RecordFieldType.STRING.getDataType() + ), + schema.getDataType("maybeTime").get() + ); assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), schema.getDataType("maybeDate").get()); assertSame(RecordFieldType.INT, schema.getDataType("parentIds").get().getFieldType()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index a012ebb297..1cfaafdb40 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -117,7 +117,7 @@ public class TestWriteCSVResult { final String values = splits[1]; final StringBuilder expectedBuilder = new StringBuilder(); - expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + dateValue + "\",\"" + timeValue + "\",\"" + timestampValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,"); + expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,"); final String expectedValues = expectedBuilder.toString(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java index 4a103563a6..a4f356ffa7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java @@ -153,7 +153,10 @@ public class TestInferJsonSchemaAccessStrategy { // TIME value and a STRING should be inferred as a STRING field final RecordField maybeTimeField = schema.getField("maybeTime").get(); - assertEquals(RecordFieldType.STRING, maybeTimeField.getDataType().getFieldType()); + assertEquals( + RecordFieldType.CHOICE.getChoiceDataType().getFieldType(), + maybeTimeField.getDataType().getFieldType()) + ; // DATE value and a null value should be inferred as a DATE field final RecordField maybeDateField = schema.getField("maybeDate").get(); @@ -169,7 +172,7 @@ public class TestInferJsonSchemaAccessStrategy { final RecordSchema schema = inferSchema(file); assertSame(RecordFieldType.STRING, schema.getDataType("name").get().getFieldType()); - assertSame(RecordFieldType.STRING, schema.getDataType("age").get().getFieldType()); + assertSame(RecordFieldType.CHOICE, schema.getDataType("age").get().getFieldType()); final DataType valuesDataType = schema.getDataType("values").get(); assertSame(RecordFieldType.CHOICE, valuesDataType.getFieldType()); @@ -178,7 +181,10 @@ public class TestInferJsonSchemaAccessStrategy { final List possibleTypes = valuesChoiceType.getPossibleSubTypes(); assertEquals(2, possibleTypes.size()); assertTrue(possibleTypes.contains(RecordFieldType.STRING.getDataType())); - assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.LONG.getDataType(), + RecordFieldType.STRING.getDataType() + )))); assertSame(RecordFieldType.STRING, schema.getDataType("nullValue").get().getFieldType()); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java new file mode 100644 index 0000000000..caf0038493 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.inference; + +import com.google.common.collect.Sets; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.BiFunction; + +import static com.google.common.collect.Collections2.permutations; +import static org.junit.Assert.assertEquals; + +public class TestFieldTypeInference { + private FieldTypeInference testSubject; + + @Before + public void setUp() throws Exception { + testSubject = new FieldTypeInference(); + } + + @Test + public void testToDataTypeWith_SHORT_INT_LONG_shouldReturn_LONG() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.SHORT.getDataType(), + RecordFieldType.INT.getDataType(), + RecordFieldType.LONG.getDataType() + ); + + DataType expected = RecordFieldType.LONG.getDataType(); + + // WHEN + // THEN + runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected); + } + + @Test + public void testToDataTypeWith_INT_FLOAT_ShouldReturn_INT_FLOAT() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType() + ); + + Set expected = Sets.newHashSet( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType() + ); + + // WHEN + // THEN + runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected); + } + + @Test + public void testToDataTypeWith_INT_STRING_shouldReturn_INT_STRING() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.STRING.getDataType() + ); + + + Set expected = Sets.newHashSet( + RecordFieldType.INT.getDataType(), + RecordFieldType.STRING.getDataType() + ); + + // WHEN + // THEN + runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected); + } + + @Test + public void testToDataTypeWith_INT_FLOAT_STRING_shouldReturn_INT_FLOAT_STRING() { + // GIVEN + List dataTypes = Arrays.asList( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType(), + RecordFieldType.STRING.getDataType() + ); + + Set expected = Sets.newHashSet( + RecordFieldType.INT.getDataType(), + RecordFieldType.FLOAT.getDataType(), + RecordFieldType.STRING.getDataType() + ); + + // WHEN + // THEN + runWithAllPermutations(this::testToDataTypeShouldReturnChoice, dataTypes, expected); + } + + @Test + public void testToDataTypeWithMultipleRecord() { + // GIVEN + String fieldName = "fieldName"; + DataType fieldType1 = RecordFieldType.INT.getDataType(); + DataType fieldType2 = RecordFieldType.FLOAT.getDataType(); + DataType fieldType3 = RecordFieldType.STRING.getDataType(); + + List dataTypes = Arrays.asList( + RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType1)), + RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType2)), + RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType3)), + RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, fieldType2)) + ); + + DataType expected = RecordFieldType.RECORD.getRecordDataType(createRecordSchema( + fieldName, + RecordFieldType.CHOICE.getChoiceDataType( + fieldType1, + fieldType2, + fieldType3 + ) + )); + + // WHEN + // THEN + runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected); + } + + private SimpleRecordSchema createRecordSchema(String fieldName, DataType fieldType) { + return new SimpleRecordSchema(Arrays.asList( + new RecordField(fieldName, fieldType) + )); + } + + private void runWithAllPermutations(BiFunction, E, ?> test, List input, E expected) { + permutations(input).forEach(inputPermutation -> test.apply(inputPermutation, expected)); + } + + private Void testToDataTypeShouldReturnChoice(List dataTypes, Set expected) { + // GIVEN + dataTypes.forEach(testSubject::addPossibleDataType); + + // WHEN + DataType actual = testSubject.toDataType(); + + // THEN + assertEquals(expected, new HashSet<>(((ChoiceDataType) actual).getPossibleSubTypes())); + + return null; + } + + private Void testToDataTypeShouldReturnSingleType(List dataTypes, DataType expected) { + // GIVEN + dataTypes.forEach(testSubject::addPossibleDataType); + + // WHEN + DataType actual = testSubject.toDataType(); + + // THEN + assertEquals(expected, actual); + + return null; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java index 75d6988505..56ae4e6471 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java @@ -85,7 +85,13 @@ public class TestInferXmlSchema { assertSame(RecordFieldType.STRING, schema.getDataType("COUNTRY").get().getFieldType()); assertEquals(RecordFieldType.DATE.getDataType(timeValueInference.getDateFormat()), schema.getDataType("DOB").get()); - assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("TOB").get()); + assertEquals( + RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.TIME.getDataType("HH:mm:ss"), + RecordFieldType.STRING.getDataType() + ), + schema.getDataType("TOB").get() + ); assertEquals(RecordFieldType.TIMESTAMP.getDataType(timeValueInference.getTimestampFormat()), schema.getDataType("TSOB").get()); final DataType addressDataType = schema.getDataType("ADDRESS").get(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json index b4c73f8064..0472512edf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json @@ -7,9 +7,9 @@ "bigint" : 8, "float" : 8.0, "double" : 8.0, + "timestamp" : "2017-01-01 17:00:00", "date" : "2017-01-01", "time" : "17:00:00", - "timestamp" : "2017-01-01 17:00:00", "char" : "c", "string" : "string", "record" : null,