From f772f2f093de38a97e6a71988628b5fd5aa4139c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 6 Dec 2017 11:40:05 -0500 Subject: [PATCH] NIFI-4671: This closes #2328. Ensure that Avro Schemas that are created properly denote fields as being nullable iff the schemas says they are, for non-top-level fields Signed-off-by: joewitt --- .../nifi/serialization/record/ResultSetRecordSet.java | 11 ++++++++++- .../nifi/serialization/record/util/DataTypeUtils.java | 2 +- .../main/java/org/apache/nifi/avro/AvroTypeUtil.java | 7 ++++--- .../org/apache/nifi/csv/CSVHeaderSchemaStrategy.java | 2 +- .../main/java/org/apache/nifi/grok/GrokReader.java | 4 ++-- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index b6daab76c6..ad26d792f9 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -127,7 +127,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final DataType dataType = getDataType(sqlType, rs, column); final String fieldName = metadata.getColumnLabel(column); - final RecordField field = new RecordField(fieldName, dataType); + + final int nullableFlag = metadata.isNullable(column); + final boolean nullable; + if (nullableFlag == ResultSetMetaData.columnNoNulls) { + nullable = false; + } else { + nullable = true; + } + + final RecordField field = new RecordField(fieldName, dataType, nullable); fields.add(field); } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 55a4d6909c..ccd9270d3a 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -985,7 +985,7 @@ public class DataTypeUtils { dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType()); } - return new RecordField(fieldName, dataType, defaultValue, aliases); + return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable()); } public static boolean isScalarValue(final DataType dataType, final Object value) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index abc381fcf5..c5256c4679 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -289,11 +289,12 @@ public class AvroTypeUtil { final String fieldName = field.name(); final Schema fieldSchema = field.schema(); final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes); + final boolean nullable = isNullable(fieldSchema); if (field.defaultVal() == JsonProperties.NULL_VALUE) { - recordFields.add(new RecordField(fieldName, fieldType, field.aliases())); + recordFields.add(new RecordField(fieldName, fieldType, field.aliases(), nullable)); } else { - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases(), nullable)); } } @@ -800,7 +801,7 @@ public class AvroTypeUtil { final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); final List mapFields = new ArrayList<>(); for (final String key : map.keySet()) { - mapFields.add(new RecordField(key, elementType)); + mapFields.add(new RecordField(key, elementType, true)); } final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); return new MapRecord(mapSchema, map); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index 642f360817..9c31cca347 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -65,7 +65,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { final List fields = new ArrayList<>(); for (final String columnName : csvParser.getHeaderMap().keySet()) { - fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType(), true)); } return new SimpleRecordSchema(fields); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 4a26975b62..6eea8e3812 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -150,8 +150,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac String grokExpression = grok.getOriginalGrokPattern(); populateSchemaFieldNames(grok, grokExpression, fields); - fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); - fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true)); final RecordSchema schema = new SimpleRecordSchema(fields); return schema;