NIFI-4671: This closes #2328. Ensure that Avro Schemas that are created properly denote fields as being nullable iff the schemas says they are, for non-top-level fields

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-12-06 11:40:05 -05:00 committed by joewitt
parent 600586d6be
commit f772f2f093
5 changed files with 18 additions and 8 deletions

View File

@ -127,7 +127,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final DataType dataType = getDataType(sqlType, rs, column); final DataType dataType = getDataType(sqlType, rs, column);
final String fieldName = metadata.getColumnLabel(column); final String fieldName = metadata.getColumnLabel(column);
final RecordField field = new RecordField(fieldName, dataType);
final int nullableFlag = metadata.isNullable(column);
final boolean nullable;
if (nullableFlag == ResultSetMetaData.columnNoNulls) {
nullable = false;
} else {
nullable = true;
}
final RecordField field = new RecordField(fieldName, dataType, nullable);
fields.add(field); fields.add(field);
} }

View File

@ -985,7 +985,7 @@ public class DataTypeUtils {
dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType()); dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType());
} }
return new RecordField(fieldName, dataType, defaultValue, aliases); return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable());
} }
public static boolean isScalarValue(final DataType dataType, final Object value) { public static boolean isScalarValue(final DataType dataType, final Object value) {

View File

@ -289,11 +289,12 @@ public class AvroTypeUtil {
final String fieldName = field.name(); final String fieldName = field.name();
final Schema fieldSchema = field.schema(); final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes); final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes);
final boolean nullable = isNullable(fieldSchema);
if (field.defaultVal() == JsonProperties.NULL_VALUE) { if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, fieldType, field.aliases())); recordFields.add(new RecordField(fieldName, fieldType, field.aliases(), nullable));
} else { } else {
recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases(), nullable));
} }
} }
@ -800,7 +801,7 @@ public class AvroTypeUtil {
final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
final List<RecordField> mapFields = new ArrayList<>(); final List<RecordField> mapFields = new ArrayList<>();
for (final String key : map.keySet()) { for (final String key : map.keySet()) {
mapFields.add(new RecordField(key, elementType)); mapFields.add(new RecordField(key, elementType, true));
} }
final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
return new MapRecord(mapSchema, map); return new MapRecord(mapSchema, map);

View File

@ -65,7 +65,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();
for (final String columnName : csvParser.getHeaderMap().keySet()) { for (final String columnName : csvParser.getHeaderMap().keySet()) {
fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType())); fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType(), true));
} }
return new SimpleRecordSchema(fields); return new SimpleRecordSchema(fields);

View File

@ -150,8 +150,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
String grokExpression = grok.getOriginalGrokPattern(); String grokExpression = grok.getOriginalGrokPattern();
populateSchemaFieldNames(grok, grokExpression, fields); populateSchemaFieldNames(grok, grokExpression, fields);
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType())); fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true));
final RecordSchema schema = new SimpleRecordSchema(fields); final RecordSchema schema = new SimpleRecordSchema(fields);
return schema; return schema;