NIFI-7300 Allowing narrow numeric types to fit againt schema check with wider type; Allowing doubles with value within float precision to be considered as valid floats (NIFI-7302)

This commit is contained in:
Bence Simon 2020-04-07 12:48:39 +02:00 committed by Mark Payne
parent 99e69f0252
commit 923a07a5db
5 changed files with 498 additions and 22 deletions

View File

@ -232,7 +232,7 @@ public enum RecordFieldType {
this.defaultFormat = null; this.defaultFormat = null;
this.defaultDataType = new DataType(this, defaultFormat); this.defaultDataType = new DataType(this, defaultFormat);
this.narrowDataTypes = new HashSet<>(Arrays.asList(narrowDataTypes)); this.narrowDataTypes = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(narrowDataTypes)));
} }
private RecordFieldType(final String simpleName, final String defaultFormat) { private RecordFieldType(final String simpleName, final String defaultFormat) {
@ -364,4 +364,8 @@ public enum RecordFieldType {
public static RecordFieldType of(final String typeString) { public static RecordFieldType of(final String typeString) {
return SIMPLE_NAME_MAP.get(typeString); return SIMPLE_NAME_MAP.get(typeString);
} }
public Set<RecordFieldType> getNarrowDataTypes() {
return narrowDataTypes;
}
} }

View File

@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -100,6 +101,34 @@ public class DataTypeUtils {
private static final Supplier<DateFormat> DEFAULT_TIME_FORMAT = () -> getDateFormat(RecordFieldType.TIME.getDefaultFormat()); private static final Supplier<DateFormat> DEFAULT_TIME_FORMAT = () -> getDateFormat(RecordFieldType.TIME.getDefaultFormat());
private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()); private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
private static final int FLOAT_SIGNIFICAND_PRECISION = 24; // As specified in IEEE 754 binary32
private static final int DOUBLE_SIGNIFICAND_PRECISION = 53; // As specified in IEEE 754 binary64
private static final Long MAX_GUARANTEED_PRECISE_WHOLE_IN_FLOAT = Double.valueOf(Math.pow(2, FLOAT_SIGNIFICAND_PRECISION)).longValue();
private static final Long MIN_GUARANTEED_PRECISE_WHOLE_IN_FLOAT = -MAX_GUARANTEED_PRECISE_WHOLE_IN_FLOAT;
private static final Long MAX_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE = Double.valueOf(Math.pow(2, DOUBLE_SIGNIFICAND_PRECISION)).longValue();
private static final Long MIN_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE = -MAX_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE;
private static final BigInteger MAX_FLOAT_VALUE_IN_BIGINT = BigInteger.valueOf(MAX_GUARANTEED_PRECISE_WHOLE_IN_FLOAT);
private static final BigInteger MIN_FLOAT_VALUE_IN_BIGINT = BigInteger.valueOf(MIN_GUARANTEED_PRECISE_WHOLE_IN_FLOAT);
private static final BigInteger MAX_DOUBLE_VALUE_IN_BIGINT = BigInteger.valueOf(MAX_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE);
private static final BigInteger MIN_DOUBLE_VALUE_IN_BIGINT = BigInteger.valueOf(MIN_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE);
private static final double MAX_FLOAT_VALUE_IN_DOUBLE = Float.valueOf(Float.MAX_VALUE).doubleValue();
private static final double MIN_FLOAT_VALUE_IN_DOUBLE = -MAX_FLOAT_VALUE_IN_DOUBLE;
private static final Map<RecordFieldType, Predicate<Object>> NUMERIC_VALIDATORS = new EnumMap<>(RecordFieldType.class);
static {
NUMERIC_VALIDATORS.put(RecordFieldType.BIGINT, value -> value instanceof BigInteger);
NUMERIC_VALIDATORS.put(RecordFieldType.LONG, value -> value instanceof Long);
NUMERIC_VALIDATORS.put(RecordFieldType.INT, value -> value instanceof Integer);
NUMERIC_VALIDATORS.put(RecordFieldType.BYTE, value -> value instanceof Byte);
NUMERIC_VALIDATORS.put(RecordFieldType.SHORT, value -> value instanceof Short);
NUMERIC_VALIDATORS.put(RecordFieldType.DOUBLE, value -> value instanceof Double);
NUMERIC_VALIDATORS.put(RecordFieldType.FLOAT, value -> value instanceof Float);
}
public static Object convertType(final Object value, final DataType dataType, final String fieldName) { public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
return convertType(value, dataType, fieldName, StandardCharsets.UTF_8); return convertType(value, dataType, fieldName, StandardCharsets.UTF_8);
} }
@ -1785,4 +1814,133 @@ public class DataTypeUtils {
return Charset.forName(charsetName); return Charset.forName(charsetName);
} }
} }
/**
* Returns true if the given value is an integer value and fits into a float variable without precision loss. This is
* decided based on the numerical value of the input and the significant bytes used in the float.
*
* @param value The value to check.
*
* @return True in case of the value meets the conditions, false otherwise.
*/
public static boolean isIntegerFitsToFloat(final Object value) {
if (!(value instanceof Integer)) {
return false;
}
final int intValue = (Integer) value;
return MIN_GUARANTEED_PRECISE_WHOLE_IN_FLOAT <= intValue && intValue <= MAX_GUARANTEED_PRECISE_WHOLE_IN_FLOAT;
}
/**
* Returns true if the given value is a long value and fits into a float variable without precision loss. This is
* decided based on the numerical value of the input and the significant bytes used in the float.
*
* @param value The value to check.
*
* @return True in case of the value meets the conditions, false otherwise.
*/
public static boolean isLongFitsToFloat(final Object value) {
if (!(value instanceof Long)) {
return false;
}
final long longValue = (Long) value;
return MIN_GUARANTEED_PRECISE_WHOLE_IN_FLOAT <= longValue && longValue <= MAX_GUARANTEED_PRECISE_WHOLE_IN_FLOAT;
}
/**
* Returns true if the given value is a long value and fits into a double variable without precision loss. This is
* decided based on the numerical value of the input and the significant bytes used in the double.
*
* @param value The value to check.
*
* @return True in case of the value meets the conditions, false otherwise.
*/
public static boolean isLongFitsToDouble(final Object value) {
if (!(value instanceof Long)) {
return false;
}
final long longValue = (Long) value;
return MIN_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE <= longValue && longValue <= MAX_GUARANTEED_PRECISE_WHOLE_IN_DOUBLE;
}
/**
* Returns true if the given value is a BigInteger value and fits into a float variable without precision loss. This is
* decided based on the numerical value of the input and the significant bytes used in the float.
*
* @param value The value to check.
*
* @return True in case of the value meets the conditions, false otherwise.
*/
public static boolean isBigIntFitsToFloat(final Object value) {
if (!(value instanceof BigInteger)) {
return false;
}
final BigInteger bigIntValue = (BigInteger) value;
return bigIntValue.compareTo(MIN_FLOAT_VALUE_IN_BIGINT) >= 0 && bigIntValue.compareTo(MAX_FLOAT_VALUE_IN_BIGINT) <= 0;
}
/**
* Returns true if the given value is a BigInteger value and fits into a double variable without precision loss. This is
* decided based on the numerical value of the input and the significant bytes used in the double.
*
* @param value The value to check.
*
* @return True in case of the value meets the conditions, false otherwise.
*/
public static boolean isBigIntFitsToDouble(final Object value) {
if (!(value instanceof BigInteger)) {
return false;
}
final BigInteger bigIntValue = (BigInteger) value;
return bigIntValue.compareTo(MIN_DOUBLE_VALUE_IN_BIGINT) >= 0 && bigIntValue.compareTo(MAX_DOUBLE_VALUE_IN_BIGINT) <= 0;
}
/**
* Returns true in case the incoming value is a double which is within the range of float variable type.
*
* <p>
* Note: the method only considers the covered range but not precision. The reason for this is that at this point the
* double representation might already slightly differs from the original text value.
* </p>
*
* @param value The value to check.
*
* @return True in case of the double value fits to float data type.
*/
public static boolean isDoubleWithinFloatInterval(final Object value) {
if (!(value instanceof Double)) {
return false;
}
final Double doubleValue = (Double) value;
return MIN_FLOAT_VALUE_IN_DOUBLE <= doubleValue && doubleValue <= MAX_FLOAT_VALUE_IN_DOUBLE;
}
/**
* Checks if an incoming value satisfies the requirements of a given (numeric) type or any of it's narrow data type.
*
* @param value Incoming value.
* @param fieldType The expected field type.
*
* @return Returns true if the incoming value satisfies the data type of any of it's narrow data types. Otherwise returns false. Only numeric data types are supported.
*/
public static boolean isFittingNumberType(final Object value, final RecordFieldType fieldType) {
if (NUMERIC_VALIDATORS.get(fieldType).test(value)) {
return true;
}
for (final RecordFieldType recordFieldType : fieldType.getNarrowDataTypes()) {
if (NUMERIC_VALIDATORS.get(recordFieldType).test(value)) {
return true;
}
}
return false;
}
} }

View File

@ -579,4 +579,155 @@ public class TestDataTypeUtils {
assertEquals(Optional.ofNullable(expected), actual); assertEquals(Optional.ofNullable(expected), actual);
}); });
} }
@Test
public void testIsIntegerFitsToFloat() {
final int maxRepresentableInt = Double.valueOf(Math.pow(2, 24)).intValue();
assertTrue(DataTypeUtils.isIntegerFitsToFloat(0));
assertTrue(DataTypeUtils.isIntegerFitsToFloat(9));
assertTrue(DataTypeUtils.isIntegerFitsToFloat(maxRepresentableInt));
assertTrue(DataTypeUtils.isIntegerFitsToFloat(-1 * maxRepresentableInt));
assertFalse(DataTypeUtils.isIntegerFitsToFloat("test"));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(9L));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(9.0));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(Integer.MAX_VALUE));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(Integer.MIN_VALUE));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(maxRepresentableInt + 1));
assertFalse(DataTypeUtils.isIntegerFitsToFloat(-1 * maxRepresentableInt - 1));
}
@Test
public void testIsLongFitsToFloat() {
final long maxRepresentableLong = Double.valueOf(Math.pow(2, 24)).longValue();
assertTrue(DataTypeUtils.isLongFitsToFloat(0L));
assertTrue(DataTypeUtils.isLongFitsToFloat(9L));
assertTrue(DataTypeUtils.isLongFitsToFloat(maxRepresentableLong));
assertTrue(DataTypeUtils.isLongFitsToFloat(-1L * maxRepresentableLong));
assertFalse(DataTypeUtils.isLongFitsToFloat("test"));
assertFalse(DataTypeUtils.isLongFitsToFloat(9));
assertFalse(DataTypeUtils.isLongFitsToFloat(9.0));
assertFalse(DataTypeUtils.isLongFitsToFloat(Long.MAX_VALUE));
assertFalse(DataTypeUtils.isLongFitsToFloat(Long.MIN_VALUE));
assertFalse(DataTypeUtils.isLongFitsToFloat(maxRepresentableLong + 1L));
assertFalse(DataTypeUtils.isLongFitsToFloat(-1L * maxRepresentableLong - 1L));
}
@Test
public void testIsLongFitsToDouble() {
final long maxRepresentableLong = Double.valueOf(Math.pow(2, 53)).longValue();
assertTrue(DataTypeUtils.isLongFitsToDouble(0L));
assertTrue(DataTypeUtils.isLongFitsToDouble(9L));
assertTrue(DataTypeUtils.isLongFitsToDouble(maxRepresentableLong));
assertTrue(DataTypeUtils.isLongFitsToDouble(-1L * maxRepresentableLong));
assertFalse(DataTypeUtils.isLongFitsToDouble("test"));
assertFalse(DataTypeUtils.isLongFitsToDouble(9));
assertFalse(DataTypeUtils.isLongFitsToDouble(9.0));
assertFalse(DataTypeUtils.isLongFitsToDouble(Long.MAX_VALUE));
assertFalse(DataTypeUtils.isLongFitsToDouble(Long.MIN_VALUE));
assertFalse(DataTypeUtils.isLongFitsToDouble(maxRepresentableLong + 1L));
assertFalse(DataTypeUtils.isLongFitsToDouble(-1L * maxRepresentableLong - 1L));
}
@Test
public void testIsBigIntFitsToFloat() {
final BigInteger maxRepresentableBigInt = BigInteger.valueOf(Double.valueOf(Math.pow(2, 24)).longValue());
assertTrue(DataTypeUtils.isBigIntFitsToFloat(BigInteger.valueOf(0L)));
assertTrue(DataTypeUtils.isBigIntFitsToFloat(BigInteger.valueOf(8L)));
assertTrue(DataTypeUtils.isBigIntFitsToFloat(maxRepresentableBigInt));
assertTrue(DataTypeUtils.isBigIntFitsToFloat(maxRepresentableBigInt.negate()));
assertFalse(DataTypeUtils.isBigIntFitsToFloat("test"));
assertFalse(DataTypeUtils.isBigIntFitsToFloat(9));
assertFalse(DataTypeUtils.isBigIntFitsToFloat(9.0));
assertFalse(DataTypeUtils.isBigIntFitsToFloat(new BigInteger(String.join("", Collections.nCopies(100, "1")))));
assertFalse(DataTypeUtils.isBigIntFitsToFloat(new BigInteger(String.join("", Collections.nCopies(100, "1"))).negate()));
}
@Test
public void testIsBigIntFitsToDouble() {
final BigInteger maxRepresentableBigInt = BigInteger.valueOf(Double.valueOf(Math.pow(2, 53)).longValue());
assertTrue(DataTypeUtils.isBigIntFitsToDouble(BigInteger.valueOf(0L)));
assertTrue(DataTypeUtils.isBigIntFitsToDouble(BigInteger.valueOf(8L)));
assertTrue(DataTypeUtils.isBigIntFitsToDouble(maxRepresentableBigInt));
assertTrue(DataTypeUtils.isBigIntFitsToDouble(maxRepresentableBigInt.negate()));
assertFalse(DataTypeUtils.isBigIntFitsToDouble("test"));
assertFalse(DataTypeUtils.isBigIntFitsToDouble(9));
assertFalse(DataTypeUtils.isBigIntFitsToDouble(9.0));
assertFalse(DataTypeUtils.isBigIntFitsToDouble(new BigInteger(String.join("", Collections.nCopies(100, "1")))));
assertFalse(DataTypeUtils.isBigIntFitsToDouble(new BigInteger(String.join("", Collections.nCopies(100, "1"))).negate()));
}
@Test
public void testIsDoubleWithinFloatInterval() {
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval(0D));
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval(0.1D));
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval((double) Float.MAX_VALUE));
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval((double) Float.MIN_VALUE));
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval((double) -1 * Float.MAX_VALUE));
assertTrue(DataTypeUtils.isDoubleWithinFloatInterval((double) -1 * Float.MIN_VALUE));
assertFalse(DataTypeUtils.isDoubleWithinFloatInterval("test"));
assertFalse(DataTypeUtils.isDoubleWithinFloatInterval(9));
assertFalse(DataTypeUtils.isDoubleWithinFloatInterval(9.0F));
assertFalse(DataTypeUtils.isDoubleWithinFloatInterval(Double.MAX_VALUE));
assertFalse(DataTypeUtils.isDoubleWithinFloatInterval((double) -1 * Double.MAX_VALUE));
}
@Test
public void testIsFittingNumberType() {
// Byte
assertTrue(DataTypeUtils.isFittingNumberType((byte) 9, RecordFieldType.BYTE));
assertFalse(DataTypeUtils.isFittingNumberType((short)9, RecordFieldType.BYTE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.BYTE));
assertFalse(DataTypeUtils.isFittingNumberType(9L, RecordFieldType.BYTE));
assertFalse(DataTypeUtils.isFittingNumberType(BigInteger.valueOf(9L), RecordFieldType.BYTE));
// Short
assertTrue(DataTypeUtils.isFittingNumberType((byte) 9, RecordFieldType.SHORT));
assertTrue(DataTypeUtils.isFittingNumberType((short)9, RecordFieldType.SHORT));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.SHORT));
assertFalse(DataTypeUtils.isFittingNumberType(9L, RecordFieldType.SHORT));
assertFalse(DataTypeUtils.isFittingNumberType(BigInteger.valueOf(9L), RecordFieldType.SHORT));
// Integer
assertTrue(DataTypeUtils.isFittingNumberType((byte) 9, RecordFieldType.INT));
assertTrue(DataTypeUtils.isFittingNumberType((short)9, RecordFieldType.INT));
assertTrue(DataTypeUtils.isFittingNumberType(9, RecordFieldType.INT));
assertFalse(DataTypeUtils.isFittingNumberType(9L, RecordFieldType.INT));
assertFalse(DataTypeUtils.isFittingNumberType(BigInteger.valueOf(9L), RecordFieldType.INT));
// Long
assertTrue(DataTypeUtils.isFittingNumberType((byte) 9, RecordFieldType.LONG));
assertTrue(DataTypeUtils.isFittingNumberType((short)9, RecordFieldType.LONG));
assertTrue(DataTypeUtils.isFittingNumberType(9, RecordFieldType.LONG));
assertTrue(DataTypeUtils.isFittingNumberType(9L, RecordFieldType.LONG));
assertFalse(DataTypeUtils.isFittingNumberType(BigInteger.valueOf(9L), RecordFieldType.LONG));
// Bigint
assertTrue(DataTypeUtils.isFittingNumberType((byte) 9, RecordFieldType.BIGINT));
assertTrue(DataTypeUtils.isFittingNumberType((short)9, RecordFieldType.BIGINT));
assertTrue(DataTypeUtils.isFittingNumberType(9, RecordFieldType.BIGINT));
assertTrue(DataTypeUtils.isFittingNumberType(9L, RecordFieldType.BIGINT));
assertTrue(DataTypeUtils.isFittingNumberType(BigInteger.valueOf(9L), RecordFieldType.BIGINT));
// Float
assertTrue(DataTypeUtils.isFittingNumberType(9F, RecordFieldType.FLOAT));
assertFalse(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.FLOAT));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.FLOAT));
// Double
assertTrue(DataTypeUtils.isFittingNumberType(9F, RecordFieldType.DOUBLE));
assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE));
}
} }

View File

@ -17,9 +17,6 @@
package org.apache.nifi.schema.validation; package org.apache.nifi.schema.validation;
import java.math.BigInteger;
import java.util.Map;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
@ -35,7 +32,11 @@ import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError; import org.apache.nifi.serialization.record.validation.ValidationError;
import org.apache.nifi.serialization.record.validation.ValidationErrorType; import org.apache.nifi.serialization.record.validation.ValidationErrorType;
import java.util.Map;
public class StandardSchemaValidator implements RecordSchemaValidator { public class StandardSchemaValidator implements RecordSchemaValidator {
private final SchemaValidationContext validationContext; private final SchemaValidationContext validationContext;
public StandardSchemaValidator(final SchemaValidationContext validationContext) { public StandardSchemaValidator(final SchemaValidationContext validationContext) {
@ -233,36 +234,45 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
} }
return false; return false;
case BIGINT:
return value instanceof BigInteger;
case BOOLEAN: case BOOLEAN:
return value instanceof Boolean; return value instanceof Boolean;
case BYTE:
return value instanceof Byte;
case CHAR: case CHAR:
return value instanceof Character; return value instanceof Character;
case DATE: case DATE:
return value instanceof java.sql.Date; return value instanceof java.sql.Date;
case DOUBLE:
return value instanceof Double;
case FLOAT:
// Some readers do not provide float vs. double.
// We should consider if it makes sense to allow either a Float or a Double here or have
// a Reader indicate whether or not it supports higher precision, etc.
// Same goes for Short/Integer
return value instanceof Float;
case INT:
return value instanceof Integer;
case LONG:
return value instanceof Long;
case SHORT:
return value instanceof Short;
case STRING: case STRING:
return value instanceof String; return value instanceof String;
case TIME: case TIME:
return value instanceof java.sql.Time; return value instanceof java.sql.Time;
case TIMESTAMP: case TIMESTAMP:
return value instanceof java.sql.Timestamp; return value instanceof java.sql.Timestamp;
// Numeric data types
case BIGINT:
case LONG:
case INT:
case SHORT:
case BYTE:
return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType());
case DOUBLE:
return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType())
|| value instanceof Byte
|| value instanceof Short
|| value instanceof Integer
|| DataTypeUtils.isLongFitsToDouble(value)
|| DataTypeUtils.isBigIntFitsToDouble(value);
case FLOAT:
// Some readers do not provide float vs. double.
// We should consider if it makes sense to allow either a Float or a Double here or have
// a Reader indicate whether or not it supports higher precision, etc.
// Same goes for Short/Integer
return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType())
|| value instanceof Byte
|| value instanceof Short
|| DataTypeUtils.isDoubleWithinFloatInterval(value)
|| DataTypeUtils.isIntegerFitsToFloat(value)
|| DataTypeUtils.isLongFitsToFloat(value)
|| DataTypeUtils.isBigIntFitsToFloat(value);
} }
return false; return false;

View File

@ -30,11 +30,15 @@ import java.text.DateFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
@ -46,9 +50,25 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult; import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError; import org.apache.nifi.serialization.record.validation.ValidationError;
import org.apache.nifi.serialization.record.validation.ValidationErrorType;
import org.junit.Test; import org.junit.Test;
public class TestStandardSchemaValidator { public class TestStandardSchemaValidator {
private static final int FLOAT_BITS_PRECISION = 24;
private static final int DOUBLE_BITS_PRECISION = 53;
private static final Long MAX_PRECISE_WHOLE_IN_FLOAT = Double.valueOf(Math.pow(2, FLOAT_BITS_PRECISION)).longValue();
private static final Long MAX_PRECISE_WHOLE_IN_DOUBLE = Double.valueOf(Math.pow(2, DOUBLE_BITS_PRECISION)).longValue();
private static final Set<RecordFieldType> NUMERIC_TYPES = new HashSet<>(Arrays.asList(
RecordFieldType.BYTE,
RecordFieldType.SHORT,
RecordFieldType.INT,
RecordFieldType.LONG,
RecordFieldType.BIGINT,
RecordFieldType.FLOAT,
RecordFieldType.DOUBLE
));
@Test @Test
public void testValidateCorrectSimpleTypesStrictValidation() throws ParseException { public void testValidateCorrectSimpleTypesStrictValidation() throws ParseException {
@ -65,6 +85,12 @@ public class TestStandardSchemaValidator {
} else { } else {
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType())); fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
} }
if (NUMERIC_TYPES.contains(fieldType)) {
for (final RecordFieldType narrowType : fieldType.getNarrowDataTypes()) {
fields.add(new RecordField(narrowType.name().toLowerCase() + "_as_" + fieldType.name().toLowerCase(), fieldType.getDataType()));
}
}
} }
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
@ -103,6 +129,22 @@ public class TestStandardSchemaValidator {
valueMap.put("map", intMap); valueMap.put("map", intMap);
valueMap.put("mapRecord", mapRecord); valueMap.put("mapRecord", mapRecord);
valueMap.put("byte_as_short", (byte) 8);
valueMap.put("short_as_int", (short) 8);
valueMap.put("byte_as_int", (byte) 8);
valueMap.put("int_as_long", 9);
valueMap.put("short_as_long", (short) 8);
valueMap.put("byte_as_long", (byte) 1);
valueMap.put("byte_as_bigint", (byte) 8);
valueMap.put("short_as_bigint", (short) 8);
valueMap.put("int_as_bigint", 8);
valueMap.put("long_as_bigint", 8L);
valueMap.put("float_as_double", 8.0F);
final Record record = new MapRecord(schema, valueMap); final Record record = new MapRecord(schema, valueMap);
final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true); final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
@ -114,6 +156,75 @@ public class TestStandardSchemaValidator {
assertTrue(result.getValidationErrors().isEmpty()); assertTrue(result.getValidationErrors().isEmpty());
} }
@Test
public void testStringDoesNotAllowNarrowTypesWhenStrictValidation() {
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(12345, RecordFieldType.STRING);
}
@Test
public void testDoubleWithinFloatRangeIsConsideredAsValid() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(1.5191525220870972D, RecordFieldType.FLOAT);
}
@Test
public void testByteIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DOUBLE);
}
@Test
public void testShortIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DOUBLE);
}
@Test
public void testIntegerWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT.intValue(), RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DOUBLE);
}
@Test
public void testIntegerOutsideRangeIsConsideredAsInvalid() {
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(MAX_PRECISE_WHOLE_IN_FLOAT.intValue() + 1, RecordFieldType.FLOAT);
// Double handles integer completely
}
@Test
public void testLongWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_DOUBLE, RecordFieldType.DOUBLE);
}
@Test
public void testLongOutsideRangeIsConsideredAsInvalid() {
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(MAX_PRECISE_WHOLE_IN_FLOAT + 1, RecordFieldType.FLOAT);
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(MAX_PRECISE_WHOLE_IN_DOUBLE + 1, RecordFieldType.DOUBLE);
}
@Test
public void testBigintWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.DOUBLE);
}
@Test
public void testBigintOutsideRangeIsConsideredAsInvalid() {
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(String.join("", Collections.nCopies(100, "1")), RecordFieldType.FLOAT);
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(String.join("", Collections.nCopies(100, "1")), RecordFieldType.DOUBLE);
}
@Test
public void testDoubleAboveFloatRangeIsConsideredAsInvalid() {
final double aboveFloatRange = Float.MAX_VALUE * 1.1;
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(aboveFloatRange, RecordFieldType.FLOAT);
}
@Test
public void testDoubleBelowFloatRangeIsConsideredAsInvalid() {
final double belowFloatRange = Float.MAX_VALUE * -1.1;
whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(belowFloatRange, RecordFieldType.FLOAT);
}
@Test @Test
public void testValidateWrongButCoerceableType() throws ParseException { public void testValidateWrongButCoerceableType() throws ParseException {
@ -311,4 +422,46 @@ public class TestStandardSchemaValidator {
assertNotNull(result.getValidationErrors()); assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty()); assertTrue(result.getValidationErrors().isEmpty());
} }
private void whenValueIsAcceptedAsDataTypeThenConsideredAsValid(final Object value, final RecordFieldType schemaDataType) {
final SchemaValidationResult result = whenSingleValueIsTested(value, schemaDataType);
thenSingleValueIsValid(result);
}
private void whenValueIsNotAcceptedAsDataTypeThenConsideredAsInvalid(final Object value, final RecordFieldType schemaDataType) {
final SchemaValidationResult result = whenSingleValueIsTested(value, schemaDataType);
thenSingleValueIsInvalid(result);
}
private SchemaValidationResult whenSingleValueIsTested(final Object value, final RecordFieldType schemaDataType) {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("test", schemaDataType.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("test", value);
final Record record = new MapRecord(schema, valueMap);
final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
final StandardSchemaValidator validator = new StandardSchemaValidator(validationContext);
return validator.validate(record);
}
private void thenSingleValueIsValid(SchemaValidationResult result) {
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
private void thenSingleValueIsInvalid(SchemaValidationResult result) {
assertFalse(result.isValid());
final Collection<ValidationError> validationErrors = result.getValidationErrors();
assertEquals(1, validationErrors.size());
final ValidationError validationError = validationErrors.iterator().next();
assertEquals("/test", validationError.getFieldName().get());
assertEquals(ValidationErrorType.INVALID_FIELD, validationError.getType());
}
} }