null
if no more records are available.
+ * Returns the next record in the stream or null
if no more records are available. Types will be coerced and any unknown fields will be dropped.
*
* @return the next record in the stream or null
if no more records are available.
*
* @throws IOException if unable to read from the underlying data
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
+ * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
*/
- Record nextRecord() throws IOException, MalformedRecordException;
+ default Record nextRecord() throws IOException, MalformedRecordException {
+ return nextRecord(true, true);
+ }
+
+ /**
+ * Reads the next record from the underlying stream. If type coercion is enabled, then any field in the Record whose type does not
+ * match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
+ * the correct type. If type coercion is disabled, then no type coercion will occur. As a result, calling
+ * {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}
+ * may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
+ *
+ * @param coerceTypes whether or not fields in the Record should be validated against the schema and coerced when necessary
+ * @param dropUnknownFields if true
, any field that is found in the data that is not present in the schema will be dropped. If false
,
+ * those fields will still be part of the Record (though their type cannot be coerced, since the schema does not provide a type for it).
+ *
+ * @return the next record in the stream or null
if no more records are available
+ * @throws IOException if unable to read from the underlying data
+ * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
+ * that violates the schema and cannot be coerced into the appropriate field type.
+ * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
+ * field type and schema enforcement is enabled
+ */
+ Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
/**
* @return a RecordSchema that is appropriate for the records in the stream
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index 6c21a391f8..3e1c4ab2f7 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -24,24 +24,24 @@ import org.apache.nifi.serialization.record.Record;
public interface RecordWriter extends Closeable {
/**
- * Writes the given result set to the given output stream
+ * Writes the given record to the underlying stream
*
- * @param record the record set to serialize
+ * @param record the record to write
* @return the results of writing the data
- * @throws IOException if unable to write to the given OutputStream
+ * @throws IOException if unable to write to the underlying stream
*/
WriteResult write(Record record) throws IOException;
/**
- * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using
+ * @return the MIME Type that the Record Writer produces. This will be added to FlowFiles using
* the mime.type attribute.
*/
String getMimeType();
/**
- * Flushes any buffered data to the underlying storage mechanism
+ * Flushes any buffered data to the underlying stream
*
- * @throws IOException if unable to write to the underlying storage mechanism
+ * @throws IOException if unable to write to the underlying stream
*/
void flush() throws IOException;
}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
similarity index 77%
rename from nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
rename to nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
index af5f90917c..796cc8f9a0 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SchemaValidationException.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.nifi.serialization.record;
+package org.apache.nifi.serialization;
-public class TypeMismatchException extends RuntimeException {
- public TypeMismatchException(String message) {
+public class SchemaValidationException extends RuntimeException {
+
+ public SchemaValidationException(final String message) {
super(message);
}
- public TypeMismatchException(String message, Throwable cause) {
+ public SchemaValidationException(final String message, final Throwable cause) {
super(message, cause);
}
+
}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index ca33e3225e..c3444ed2cd 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,12 +17,16 @@
package org.apache.nifi.serialization.record;
+import java.text.DateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -32,17 +36,67 @@ public class MapRecord implements Record {
private RecordSchema schema;
private final Maptrue
, then it is safe to assume that calling {@link #getValue(RecordField)} will return an Object of the appropriate
+ * type according to the schema, or an object that can be coerced into the appropriate type. If type checking
+ * is not enabled, then calling {@link #getValue(RecordField)} can return an object of any type.
+ *
+ * @return true
if type checking is enabled, false
otherwise.
+ */
+ boolean isTypeChecked();
+
+ /**
+ * If true
, any field that is added to the record will be drop unless the field is known by the schema
+ *
+ * @return true
if fields that are unknown to the schema will be dropped, false
+ * if all field values are retained.
+ */
+ boolean isDropUnknownFields();
+
/**
* Updates the Record's schema to to incorporate all of the fields in the given schema. If both schemas have a
* field with the same name but a different type, then the existing schema will be updated to have a
@@ -45,7 +64,9 @@ public interface Record {
/**
* - * Returns a view of the the values of the fields in this Record. + * Returns a view of the the values of the fields in this Record. Note that this method returns values only for + * those entries in the Record's schema. This allows the Record to guarantee that it will return the values in + * the order dictated by the schema. *
* * NOTE: The array that is returned may be an underlying array that is backing @@ -134,4 +155,13 @@ public interface Record { * name is not a Map */ void setMapValue(String fieldName, String mapKey, Object value); + + /** + * Returns a Set that contains the names of all of the fields that are present in the Record, regardless of + * whether or not those fields are contained in the schema. To determine which fields exist in the Schema, use + * {@link #getSchema()}.{@link RecordSchema#getFieldNames() getFieldNames()} instead. + * + * @return a Set that contains the names of all of the fields that are present in the Record + */ + Set