mirror of https://github.com/apache/nifi.git
NIFI-7925: ValidateRecord reports false positive for avro arrays with null elements
This commit is contained in:
parent
55cb8d73cb
commit
d05d0c6240
Binary file not shown.
|
@ -288,17 +288,30 @@ public enum RecordFieldType {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a Data Type that represents an "ARRAY" type with the given element type.
|
* Returns a Data Type that represents an "ARRAY" type with the given element type.
|
||||||
|
* The returned array data type can't contain null elements.
|
||||||
*
|
*
|
||||||
* @param elementType the type of the arrays in the element
|
* @param elementType the type of the arrays in the element
|
||||||
* @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
|
* @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
|
||||||
* is not the ARRAY type.
|
* is not the ARRAY type.
|
||||||
*/
|
*/
|
||||||
public DataType getArrayDataType(final DataType elementType) {
|
public DataType getArrayDataType(final DataType elementType) {
|
||||||
|
return getArrayDataType(elementType, ArrayDataType.DEFAULT_NULLABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Data Type that represents an "ARRAY" type with the given element type.
|
||||||
|
*
|
||||||
|
* @param elementType the type of the arrays in the element
|
||||||
|
* @param elementsNullable indicates whether the array can contain null elements
|
||||||
|
* @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
|
||||||
|
* is not the ARRAY type.
|
||||||
|
*/
|
||||||
|
public DataType getArrayDataType(final DataType elementType, final boolean elementsNullable) {
|
||||||
if (this != ARRAY) {
|
if (this != ARRAY) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ArrayDataType(elementType);
|
return new ArrayDataType(elementType, elementsNullable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -341,17 +354,30 @@ public enum RecordFieldType {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a Data Type that represents a "MAP" type with the given value type.
|
* Returns a Data Type that represents a "MAP" type with the given value type.
|
||||||
|
* The returned map data type can't contain null values.
|
||||||
*
|
*
|
||||||
* @param valueDataType the type of the values in the map
|
* @param valueDataType the type of the values in the map
|
||||||
* @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
|
* @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
|
||||||
* is not the MAP type.
|
* is not the MAP type.
|
||||||
*/
|
*/
|
||||||
public DataType getMapDataType(final DataType valueDataType) {
|
public DataType getMapDataType(final DataType valueDataType) {
|
||||||
|
return getMapDataType(valueDataType, MapDataType.DEFAULT_NULLABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Data Type that represents a "MAP" type with the given value type.
|
||||||
|
*
|
||||||
|
* @param valueDataType the type of the values in the map
|
||||||
|
* @param valuesNullable indicates whether the map can contain null values
|
||||||
|
* @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
|
||||||
|
* is not the MAP type.
|
||||||
|
*/
|
||||||
|
public DataType getMapDataType(final DataType valueDataType, boolean valuesNullable) {
|
||||||
if (this != MAP) {
|
if (this != MAP) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new MapDataType(valueDataType);
|
return new MapDataType(valueDataType, valuesNullable);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,11 +23,20 @@ import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class ArrayDataType extends DataType {
|
public class ArrayDataType extends DataType {
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_NULLABLE = false;
|
||||||
|
|
||||||
private final DataType elementType;
|
private final DataType elementType;
|
||||||
|
private final boolean elementsNullable;
|
||||||
|
|
||||||
public ArrayDataType(final DataType elementType) {
|
public ArrayDataType(final DataType elementType) {
|
||||||
|
this(elementType, DEFAULT_NULLABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayDataType(final DataType elementType, boolean elementsNullable) {
|
||||||
super(RecordFieldType.ARRAY, null);
|
super(RecordFieldType.ARRAY, null);
|
||||||
this.elementType = elementType;
|
this.elementType = elementType;
|
||||||
|
this.elementsNullable = elementsNullable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataType getElementType() {
|
public DataType getElementType() {
|
||||||
|
@ -39,25 +48,23 @@ public class ArrayDataType extends DataType {
|
||||||
return RecordFieldType.ARRAY;
|
return RecordFieldType.ARRAY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public boolean isElementsNullable() {
|
||||||
public int hashCode() {
|
return elementsNullable;
|
||||||
return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(final Object obj) {
|
public boolean equals(Object o) {
|
||||||
if (obj == this) {
|
if (this == o) return true;
|
||||||
return true;
|
if (!(o instanceof ArrayDataType)) return false;
|
||||||
}
|
if (!super.equals(o)) return false;
|
||||||
if (obj == null) {
|
ArrayDataType that = (ArrayDataType) o;
|
||||||
return false;
|
return isElementsNullable() == that.isElementsNullable()
|
||||||
}
|
&& Objects.equals(getElementType(), that.getElementType());
|
||||||
if (!(obj instanceof ArrayDataType)) {
|
}
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ArrayDataType other = (ArrayDataType) obj;
|
@Override
|
||||||
return Objects.equals(elementType, other.elementType);
|
public int hashCode() {
|
||||||
|
return Objects.hash(super.hashCode(), getElementType(), isElementsNullable());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,41 +23,48 @@ import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class MapDataType extends DataType {
|
public class MapDataType extends DataType {
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_NULLABLE = false;
|
||||||
|
|
||||||
private final DataType valueType;
|
private final DataType valueType;
|
||||||
|
private final boolean valuesNullable;
|
||||||
|
|
||||||
public MapDataType(final DataType elementType) {
|
public MapDataType(final DataType elementType) {
|
||||||
|
this(elementType, DEFAULT_NULLABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MapDataType(final DataType elementType, boolean valuesNullable) {
|
||||||
super(RecordFieldType.MAP, null);
|
super(RecordFieldType.MAP, null);
|
||||||
this.valueType = elementType;
|
this.valueType = elementType;
|
||||||
|
this.valuesNullable = valuesNullable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataType getValueType() {
|
public DataType getValueType() {
|
||||||
return valueType;
|
return valueType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isValuesNullable() {
|
||||||
|
return valuesNullable;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordFieldType getFieldType() {
|
public RecordFieldType getFieldType() {
|
||||||
return RecordFieldType.MAP;
|
return RecordFieldType.MAP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public boolean equals(Object o) {
|
||||||
return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
|
if (this == o) return true;
|
||||||
|
if (!(o instanceof MapDataType)) return false;
|
||||||
|
if (!super.equals(o)) return false;
|
||||||
|
MapDataType that = (MapDataType) o;
|
||||||
|
return valuesNullable == that.valuesNullable
|
||||||
|
&& Objects.equals(getValueType(), that.getValueType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(final Object obj) {
|
public int hashCode() {
|
||||||
if (obj == this) {
|
return Objects.hash(super.hashCode(), getValueType(), valuesNullable);
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (obj == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!(obj instanceof MapDataType)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
final MapDataType other = (MapDataType) obj;
|
|
||||||
return Objects.equals(valueType, other.valueType);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -354,7 +354,9 @@ public class AvroTypeUtil {
|
||||||
|
|
||||||
switch (avroType) {
|
switch (avroType) {
|
||||||
case ARRAY:
|
case ARRAY:
|
||||||
return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType(), knownRecordTypes));
|
final DataType elementType = determineDataType(avroSchema.getElementType(), knownRecordTypes);
|
||||||
|
final boolean elementsNullable = isNullable(avroSchema.getElementType());
|
||||||
|
return RecordFieldType.ARRAY.getArrayDataType(elementType, elementsNullable);
|
||||||
case BYTES:
|
case BYTES:
|
||||||
case FIXED:
|
case FIXED:
|
||||||
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
|
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
|
||||||
|
@ -403,7 +405,8 @@ public class AvroTypeUtil {
|
||||||
case MAP:
|
case MAP:
|
||||||
final Schema valueSchema = avroSchema.getValueType();
|
final Schema valueSchema = avroSchema.getValueType();
|
||||||
final DataType valueType = determineDataType(valueSchema, knownRecordTypes);
|
final DataType valueType = determineDataType(valueSchema, knownRecordTypes);
|
||||||
return RecordFieldType.MAP.getMapDataType(valueType);
|
final boolean valuesNullable = isNullable(valueSchema);
|
||||||
|
return RecordFieldType.MAP.getMapDataType(valueType, valuesNullable);
|
||||||
case UNION: {
|
case UNION: {
|
||||||
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
|
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
if (validationContext.isStrictTypeChecking()) {
|
if (validationContext.isStrictTypeChecking()) {
|
||||||
if (!isTypeCorrect(rawValue, dataType)) {
|
if (!isTypeCorrect(rawValue, dataType)) {
|
||||||
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
||||||
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
|
"Value is of type " + classNameOrNull(rawValue) + " but was expected to be of type " + dataType));
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
// but will be false if the value is "123" and should be an Array or Record.
|
// but will be false if the value is "123" and should be an Array or Record.
|
||||||
if (!DataTypeUtils.isCompatibleDataType(rawValue, dataType)) {
|
if (!DataTypeUtils.isCompatibleDataType(rawValue, dataType)) {
|
||||||
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
||||||
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
|
"Value is of type " + classNameOrNull(rawValue) + " but was expected to be of type " + dataType));
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -140,7 +140,7 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
|
|
||||||
if (canonicalDataType == null) {
|
if (canonicalDataType == null) {
|
||||||
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
||||||
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
|
"Value is of type " + classNameOrNull(rawValue) + " but was expected to be of type " + dataType));
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -157,7 +157,7 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
|
if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
|
||||||
if (!(rawValue instanceof Record)) { // sanity check
|
if (!(rawValue instanceof Record)) { // sanity check
|
||||||
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
|
||||||
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + expectedDataType));
|
"Value is of type " + classNameOrNull(rawValue) + " but was expected to be of type " + expectedDataType));
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -189,6 +189,9 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
|
|
||||||
final Object[] array = (Object[]) value;
|
final Object[] array = (Object[]) value;
|
||||||
for (final Object arrayVal : array) {
|
for (final Object arrayVal : array) {
|
||||||
|
if (arrayVal == null && arrayDataType.isElementsNullable()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!isTypeCorrect(arrayVal, elementType)) {
|
if (!isTypeCorrect(arrayVal, elementType)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -202,6 +205,9 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
final Map<?, ?> map = (Map<?, ?>) value;
|
final Map<?, ?> map = (Map<?, ?>) value;
|
||||||
|
|
||||||
for (final Object mapValue : map.values()) {
|
for (final Object mapValue : map.values()) {
|
||||||
|
if (mapValue == null && mapDataType.isValuesNullable()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!isTypeCorrect(mapValue, valueDataType)) {
|
if (!isTypeCorrect(mapValue, valueDataType)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -287,4 +293,8 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|
||||||
private String concat(final String fieldPrefix, final RecordField field) {
|
private String concat(final String fieldPrefix, final RecordField field) {
|
||||||
return fieldPrefix + "/" + field.getFieldName();
|
return fieldPrefix + "/" + field.getFieldName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String classNameOrNull(Object value) {
|
||||||
|
return value == null ? "null" : value.getClass().getName();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,4 +640,38 @@ public class TestValidateRecord {
|
||||||
+ "The following 1 fields had values whose type did not match the schema: [/id]");
|
+ "The following 1 fields had values whose type did not match the schema: [/id]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidationForNullElementArrayAndMap() throws Exception {
|
||||||
|
AvroReader avroReader = new AvroReader();
|
||||||
|
runner.addControllerService("reader", avroReader);
|
||||||
|
runner.enableControllerService(avroReader);
|
||||||
|
|
||||||
|
|
||||||
|
final MockRecordWriter validWriter = new MockRecordWriter("valid", false);
|
||||||
|
runner.addControllerService("writer", validWriter);
|
||||||
|
runner.enableControllerService(validWriter);
|
||||||
|
|
||||||
|
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||||
|
runner.addControllerService("invalid-writer", invalidWriter);
|
||||||
|
runner.enableControllerService(invalidWriter);
|
||||||
|
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||||
|
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||||
|
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
|
||||||
|
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro"));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_INVALID, 0);
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||||
|
|
||||||
|
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||||
|
validFlowFile.assertAttributeEquals("record.count", "1");
|
||||||
|
validFlowFile.assertContentEquals("valid\n[text, null],{key=null}\n");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
Loading…
Reference in New Issue