NIFI-4142: This closes #2015. Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord. Updated Record Reader to take two parameters for nextRecord: (boolean coerceTypes) and (boolean dropUnknownFields)

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-06-30 08:32:01 -04:00 committed by joewitt
parent 84935d4f78
commit 451f9cf124
46 changed files with 2934 additions and 255 deletions

View File

@ -92,6 +92,10 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes);
}
protected int incrementRecordCount() {
return ++recordCount;
}
/**
* Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses
* the chance to react to a new RecordSet beginning but prevents the subclass from changing how this

View File

@ -38,14 +38,37 @@ import org.apache.nifi.serialization.record.RecordSet;
public interface RecordReader extends Closeable {
/**
* Returns the next record in the stream or <code>null</code> if no more records are available.
* Returns the next record in the stream or <code>null</code> if no more records are available. Types will be coerced and any unknown fields will be dropped.
*
* @return the next record in the stream or <code>null</code> if no more records are available.
*
* @throws IOException if unable to read from the underlying data
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
* @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
*/
Record nextRecord() throws IOException, MalformedRecordException;
default Record nextRecord() throws IOException, MalformedRecordException {
return nextRecord(true, true);
}
/**
* Reads the next record from the underlying stream. If type coercion is enabled, then any field in the Record whose type does not
* match the schema will be coerced to the correct type and a MalformedRecordException will be thrown if unable to coerce the data into
* the correct type. If type coercion is disabled, then no type coercion will occur. As a result, calling
* {@link Record#getValue(org.apache.nifi.serialization.record.RecordField)}
* may return any type of Object, such as a String or another Record, even though the schema indicates that the field must be an integer.
*
* @param coerceTypes whether or not fields in the Record should be validated against the schema and coerced when necessary
* @param dropUnknownFields if <code>true</code>, any field that is found in the data that is not present in the schema will be dropped. If <code>false</code>,
* those fields will still be part of the Record (though their type cannot be coerced, since the schema does not provide a type for it).
*
* @return the next record in the stream or <code>null</code> if no more records are available
* @throws IOException if unable to read from the underlying data
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record, or a Record contains a field
* that violates the schema and cannot be coerced into the appropriate field type.
* @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate
* field type and schema enforcement is enabled
*/
Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
/**
* @return a RecordSchema that is appropriate for the records in the stream

View File

@ -24,24 +24,24 @@ import org.apache.nifi.serialization.record.Record;
public interface RecordWriter extends Closeable {
/**
* Writes the given result set to the given output stream
* Writes the given record to the underlying stream
*
* @param record the record set to serialize
* @param record the record to write
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
* @throws IOException if unable to write to the underlying stream
*/
WriteResult write(Record record) throws IOException;
/**
* @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using
* @return the MIME Type that the Record Writer produces. This will be added to FlowFiles using
* the mime.type attribute.
*/
String getMimeType();
/**
* Flushes any buffered data to the underlying storage mechanism
* Flushes any buffered data to the underlying stream
*
* @throws IOException if unable to write to the underlying storage mechanism
* @throws IOException if unable to write to the underlying stream
*/
void flush() throws IOException;
}

View File

@ -15,14 +15,16 @@
* limitations under the License.
*/
package org.apache.nifi.serialization.record;
package org.apache.nifi.serialization;
public class TypeMismatchException extends RuntimeException {
public TypeMismatchException(String message) {
public class SchemaValidationException extends RuntimeException {
public SchemaValidationException(final String message) {
super(message);
}
public TypeMismatchException(String message, Throwable cause) {
public SchemaValidationException(final String message, final Throwable cause) {
super(message, cause);
}
}

View File

@ -17,12 +17,16 @@
package org.apache.nifi.serialization.record;
import java.text.DateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -32,17 +36,67 @@ public class MapRecord implements Record {
private RecordSchema schema;
private final Map<String, Object> values;
private Optional<SerializedForm> serializedForm;
private final boolean checkTypes;
private final boolean dropUnknownFields;
public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
this(schema, values, false, false);
}
public MapRecord(final RecordSchema schema, final Map<String, Object> values, final boolean checkTypes, final boolean dropUnknownFields) {
Objects.requireNonNull(values);
this.schema = Objects.requireNonNull(schema);
this.values = Objects.requireNonNull(values);
this.values = checkTypes ? checkTypes(values, schema) : values;
this.serializedForm = Optional.empty();
this.checkTypes = checkTypes;
this.dropUnknownFields = dropUnknownFields;
}
public MapRecord(final RecordSchema schema, final Map<String, Object> values, final SerializedForm serializedForm) {
this(schema, values, serializedForm, false, false);
}
public MapRecord(final RecordSchema schema, final Map<String, Object> values, final SerializedForm serializedForm, final boolean checkTypes, final boolean dropUnknownFields) {
Objects.requireNonNull(values);
this.schema = Objects.requireNonNull(schema);
this.values = Objects.requireNonNull(values);
this.values = checkTypes ? checkTypes(values, schema) : values;
this.serializedForm = Optional.ofNullable(serializedForm);
this.checkTypes = checkTypes;
this.dropUnknownFields = dropUnknownFields;
}
private Map<String, Object> checkTypes(final Map<String, Object> values, final RecordSchema schema) {
for (final RecordField field : schema.getFields()) {
final Object value = getExplicitValue(field, values);
if (value == null) {
if (field.isNullable()) {
continue;
}
throw new SchemaValidationException("Field " + field.getFieldName() + " cannot be null");
}
if (!DataTypeUtils.isCompatibleDataType(value, field.getDataType())) {
throw new SchemaValidationException("Field " + field.getFieldName() + " has a value of " + value
+ ", which cannot be coerced into the appropriate data type of " + field.getDataType());
}
}
return values;
}
@Override
public boolean isDropUnknownFields() {
return dropUnknownFields;
}
@Override
public boolean isTypeChecked() {
return checkTypes;
}
@Override
@ -67,7 +121,11 @@ public class MapRecord implements Record {
return getValue(fieldOption.get());
}
return null;
if (dropUnknownFields) {
return null;
}
return this.values.get(fieldName);
}
@Override
@ -115,6 +173,10 @@ public class MapRecord implements Record {
}
private Object getExplicitValue(final RecordField field) {
return getExplicitValue(field, this.values);
}
private Object getExplicitValue(final RecordField field, final Map<String, Object> values) {
final String canonicalFieldName = field.getFieldName();
// We use containsKey here instead of just calling get() and checking for a null value
@ -139,11 +201,11 @@ public class MapRecord implements Record {
@Override
public String getAsString(final String fieldName) {
final Optional<DataType> dataTypeOption = schema.getDataType(fieldName);
if (!dataTypeOption.isPresent()) {
return null;
if (dataTypeOption.isPresent()) {
return convertToString(getValue(fieldName), dataTypeOption.get().getFormat());
}
return convertToString(getValue(fieldName), dataTypeOption.get().getFormat());
return DataTypeUtils.toString(getValue(fieldName), (Supplier<DateFormat>) null);
}
@Override
@ -239,11 +301,20 @@ public class MapRecord implements Record {
public void setValue(final String fieldName, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
if (dropUnknownFields) {
return;
}
final Object previousValue = values.put(fieldName, value);
if (!Objects.equals(value, previousValue)) {
serializedForm = Optional.empty();
}
return;
}
final RecordField recordField = field.get();
final Object coerced = DataTypeUtils.convertType(value, recordField.getDataType(), fieldName);
final Object coerced = isTypeChecked() ? DataTypeUtils.convertType(value, recordField.getDataType(), fieldName) : value;
final Object previousValue = values.put(recordField.getFieldName(), coerced);
if (!Objects.equals(coerced, previousValue)) {
serializedForm = Optional.empty();
@ -327,4 +398,9 @@ public class MapRecord implements Record {
public void incorporateSchema(RecordSchema other) {
this.schema = DataTypeUtils.merge(this.schema, other);
}
@Override
public Set<String> getRawFieldNames() {
return values.keySet();
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record;
import java.io.IOException;
import org.apache.nifi.serialization.WriteResult;
public interface RawRecordWriter {
/**
* Writes the given result set to the underlying stream
*
* @param record the record to write
* @return the results of writing the data
* @throws IOException if unable to write to the underlying stream
*/
WriteResult writeRawRecord(Record record) throws IOException;
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@ -26,6 +27,24 @@ public interface Record {
RecordSchema getSchema();
/**
* Indicates whether or not field values for this record are expected to be coerced into the type designated by the schema.
* If <code>true</code>, then it is safe to assume that calling {@link #getValue(RecordField)} will return an Object of the appropriate
* type according to the schema, or an object that can be coerced into the appropriate type. If type checking
* is not enabled, then calling {@link #getValue(RecordField)} can return an object of any type.
*
* @return <code>true</code> if type checking is enabled, <code>false</code> otherwise.
*/
boolean isTypeChecked();
/**
* If <code>true</code>, any field that is added to the record will be drop unless the field is known by the schema
*
* @return <code>true</code> if fields that are unknown to the schema will be dropped, <code>false</code>
* if all field values are retained.
*/
boolean isDropUnknownFields();
/**
* Updates the Record's schema to to incorporate all of the fields in the given schema. If both schemas have a
* field with the same name but a different type, then the existing schema will be updated to have a
@ -45,7 +64,9 @@ public interface Record {
/**
* <p>
* Returns a view of the the values of the fields in this Record.
* Returns a view of the the values of the fields in this Record. Note that this method returns values only for
* those entries in the Record's schema. This allows the Record to guarantee that it will return the values in
* the order dictated by the schema.
* </p>
*
* <b>NOTE:</b> The array that is returned may be an underlying array that is backing
@ -134,4 +155,13 @@ public interface Record {
* name is not a Map
*/
void setMapValue(String fieldName, String mapKey, Object value);
/**
* Returns a Set that contains the names of all of the fields that are present in the Record, regardless of
* whether or not those fields are contained in the schema. To determine which fields exist in the Schema, use
* {@link #getSchema()}.{@link RecordSchema#getFieldNames() getFieldNames()} instead.
*
* @return a Set that contains the names of all of the fields that are present in the Record
*/
Set<String> getRawFieldNames();
}

View File

@ -24,24 +24,43 @@ import java.util.Objects;
import java.util.Set;
public class RecordField {
private static final boolean DEFAULT_NULLABLE = true;
private final String fieldName;
private final DataType dataType;
private final Set<String> aliases;
private final Object defaultValue;
private final boolean nullable;
public RecordField(final String fieldName, final DataType dataType) {
this(fieldName, dataType, null, Collections.emptySet());
this(fieldName, dataType, null, Collections.emptySet(), DEFAULT_NULLABLE);
}
public RecordField(final String fieldName, final DataType dataType, final boolean nullable) {
this(fieldName, dataType, null, Collections.emptySet(), nullable);
}
public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) {
this(fieldName, dataType, defaultValue, Collections.emptySet());
this(fieldName, dataType, defaultValue, Collections.emptySet(), DEFAULT_NULLABLE);
}
public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final boolean nullable) {
this(fieldName, dataType, defaultValue, Collections.emptySet(), nullable);
}
public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) {
this(fieldName, dataType, null, aliases);
this(fieldName, dataType, null, aliases, DEFAULT_NULLABLE);
}
public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases, final boolean nullable) {
this(fieldName, dataType, null, aliases, nullable);
}
public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) {
this(fieldName, dataType, defaultValue, aliases, DEFAULT_NULLABLE);
}
public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases, final boolean nullable) {
if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue
+ "] because that is not a valid value for Data Type [" + dataType + "]");
@ -51,6 +70,7 @@ public class RecordField {
this.dataType = Objects.requireNonNull(dataType);
this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
this.defaultValue = defaultValue;
this.nullable = nullable;
}
public String getFieldName() {
@ -69,6 +89,10 @@ public class RecordField {
return defaultValue;
}
public boolean isNullable() {
return nullable;
}
@Override
public int hashCode() {
final int prime = 31;
@ -77,6 +101,7 @@ public class RecordField {
result = prime * result + fieldName.hashCode();
result = prime * result + aliases.hashCode();
result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode());
result = prime * result + Boolean.hashCode(nullable);
return result;
}
@ -94,11 +119,12 @@ public class RecordField {
}
RecordField other = (RecordField) obj;
return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue);
return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue)
&& nullable == other.nullable;
}
@Override
public String toString() {
return "RecordField[name=" + fieldName + ", dataType=" + dataType + (aliases.isEmpty() ? "" : ", aliases=" + aliases) + "]";
return "RecordField[name=" + fieldName + ", dataType=" + dataType + (aliases.isEmpty() ? "" : ", aliases=" + aliases) + ", nullable=" + nullable + "]";
}
}

View File

@ -17,7 +17,9 @@
package org.apache.nifi.serialization.record.util;
public class IllegalTypeConversionException extends RuntimeException {
import org.apache.nifi.serialization.SchemaValidationException;
public class IllegalTypeConversionException extends SchemaValidationException {
public IllegalTypeConversionException(final String message) {
super(message);

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.validation;
import org.apache.nifi.serialization.record.Record;
public interface RecordSchemaValidator {
SchemaValidationResult validate(Record record);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.validation;
import java.util.Collection;
public interface SchemaValidationResult {
boolean isValid();
Collection<ValidationError> getValidationErrors();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.validation;
public interface ValidationContext {
boolean isExtraFieldAllowed();
boolean isTypeCoercionAllowed();
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.validation;
import java.util.Optional;
public interface ValidationError {
Optional<String> getFieldName();
Optional<Object> getInputValue();
String getExplanation();
ValidationErrorType getType();
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.validation;
public enum ValidationErrorType {
/**
* A required field (i.e., a field that is not 'nullable') exists in the schema, but the record had no value for this field
* or the value for this field was <code>null</code>.
*/
MISSING_FIELD,
/**
* The record had a field that was not valid according to the schema.
*/
EXTRA_FIELD,
/**
* The record had a value for a field, but the value was not valid according to the schema.
*/
INVALID_FIELD,
/**
* Some other sort of validation error occurred.
*/
OTHER;
}

View File

@ -75,6 +75,7 @@ public class AvroTypeUtil {
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOGICAL_TYPE_DECIMAL = "decimal";
public static Schema extractAvroSchema(final RecordSchema recordSchema) {
if (recordSchema == null) {
throw new IllegalArgumentException("RecordSchema cannot be null");
@ -110,7 +111,7 @@ public class AvroTypeUtil {
}
private static Field buildAvroField(final RecordField recordField) {
final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName());
final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
for (final String alias : recordField.getAliases()) {
field.addAlias(alias);
@ -119,7 +120,7 @@ public class AvroTypeUtil {
return field;
}
private static Schema buildAvroSchema(final DataType dataType, final String fieldName) {
private static Schema buildAvroSchema(final DataType dataType, final String fieldName, final boolean nullable) {
final Schema schema;
switch (dataType.getFieldType()) {
@ -129,7 +130,7 @@ public class AvroTypeUtil {
if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
schema = Schema.create(Type.BYTES);
} else {
final Schema elementType = buildAvroSchema(elementDataType, fieldName);
final Schema elementType = buildAvroSchema(elementDataType, fieldName, false);
schema = Schema.createArray(elementType);
}
break;
@ -151,7 +152,7 @@ public class AvroTypeUtil {
final List<Schema> unionTypes = new ArrayList<>(options.size());
for (final DataType option : options) {
unionTypes.add(buildAvroSchema(option, fieldName));
unionTypes.add(buildAvroSchema(option, fieldName, false));
}
schema = Schema.createUnion(unionTypes);
@ -173,7 +174,7 @@ public class AvroTypeUtil {
schema = Schema.create(Type.LONG);
break;
case MAP:
schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName));
schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, false));
break;
case RECORD:
final RecordDataType recordDataType = (RecordDataType) dataType;
@ -204,7 +205,11 @@ public class AvroTypeUtil {
return null;
}
return nullable(schema);
if (nullable) {
return nullable(schema);
} else {
return schema;
}
}
private static Schema nullable(final Schema schema) {
@ -362,12 +367,14 @@ public class AvroTypeUtil {
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name();
final DataType dataType = AvroTypeUtil.determineDataType(field.schema(), knownRecords);
final Schema fieldSchema = field.schema();
final DataType dataType = AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
final boolean nullable = isNullable(fieldSchema);
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, dataType, field.aliases()));
recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable));
} else {
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases()));
recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases(), nullable));
}
}
@ -375,6 +382,19 @@ public class AvroTypeUtil {
return recordSchema;
}
public static boolean isNullable(final Schema schema) {
final Type schemaType = schema.getType();
if (schemaType == Type.UNION) {
for (final Schema unionSchema : schema.getTypes()) {
if (isNullable(unionSchema)) {
return true;
}
}
}
return schemaType == Type.NULL;
}
public static Object[] convertByteArray(final byte[] bytes) {
final Object[] array = new Object[bytes.length];
for (int i = 0; i < bytes.length; i++) {

View File

@ -70,7 +70,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
if (failAfterN > -1 && recordCount >= failAfterN) {
throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
}

View File

@ -76,7 +76,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
if (failAfterN >= recordCount) {
throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.validation;
import org.apache.nifi.serialization.record.RecordSchema;
public class SchemaValidationContext {
private final RecordSchema schema;
private final boolean allowExtraFields;
private final boolean strictTypeChecking;
public SchemaValidationContext(final RecordSchema schema, final boolean allowExtraFields, final boolean strictTypeChecking) {
this.schema = schema;
this.allowExtraFields = allowExtraFields;
this.strictTypeChecking = strictTypeChecking;
}
public RecordSchema getSchema() {
return schema;
}
public boolean isExtraFieldAllowed() {
return allowExtraFields;
}
public boolean isStrictTypeChecking() {
return strictTypeChecking;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.validation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
public class StandardSchemaValidationResult implements SchemaValidationResult {
private final List<ValidationError> validationErrors = new ArrayList<>();
@Override
public boolean isValid() {
return validationErrors.isEmpty();
}
@Override
public Collection<ValidationError> getValidationErrors() {
return Collections.unmodifiableList(validationErrors);
}
public void addValidationError(final ValidationError validationError) {
this.validationErrors.add(validationError);
}
}

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.validation;
import java.math.BigInteger;
import java.util.Map;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
import org.apache.nifi.serialization.record.validation.ValidationErrorType;
public class StandardSchemaValidator implements RecordSchemaValidator {
private final SchemaValidationContext validationContext;
public StandardSchemaValidator(final SchemaValidationContext validationContext) {
this.validationContext = validationContext;
}
@Override
public SchemaValidationResult validate(final Record record) {
return validate(record, validationContext.getSchema(), "");
}
private SchemaValidationResult validate(final Record record, final RecordSchema schema, final String fieldPrefix) {
// Ensure that for every field in the schema, the type is correct (if we care) and that
// a value is present (unless it is nullable).
final StandardSchemaValidationResult result = new StandardSchemaValidationResult();
for (final RecordField field : schema.getFields()) {
final Object rawValue = record.getValue(field);
// If there is no value, then it is always valid unless the field is required.
if (rawValue == null) {
if (!field.isNullable() && field.getDefaultValue() == null) {
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), ValidationErrorType.MISSING_FIELD, "Field is required"));
}
continue;
}
// Check that the type is correct.
final DataType dataType = field.getDataType();
if (validationContext.isStrictTypeChecking()) {
if (!isTypeCorrect(rawValue, dataType)) {
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
continue;
}
} else {
// Use a lenient type check. This will be true if, for instance, a value is the String "123" and should be an integer
// but will be false if the value is "123" and should be an Array or Record.
if (!DataTypeUtils.isCompatibleDataType(rawValue, dataType)) {
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
continue;
}
}
// If the field type is RECORD, or if the field type is a CHOICE that allows for a RECORD and the value is a RECORD, then we
// need to dig into each of the sub-fields. To do this, we first need to determine the 'canonical data type'.
final DataType canonicalDataType = getCanonicalDataType(dataType, rawValue, result, fieldPrefix, field);
if (canonicalDataType == null) {
continue;
}
// Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field.
verifyComplexType(dataType, rawValue, result, fieldPrefix, field);
}
if (!validationContext.isExtraFieldAllowed()) {
for (final String fieldName : record.getRawFieldNames()) {
if (!schema.getDataType(fieldName).isPresent()) {
result.addValidationError(new StandardValidationError(fieldPrefix + "/" + fieldName, ValidationErrorType.EXTRA_FIELD, "Field is not present in the schema"));
}
}
}
return result;
}
private void verifyComplexType(final DataType dataType, final Object rawValue, final StandardSchemaValidationResult result, final String fieldPrefix, final RecordField field) {
// If the field type is RECORD, or if the field type is a CHOICE that allows for a RECORD and the value is a RECORD, then we
// need to dig into each of the sub-fields. To do this, we first need to determine the 'canonical data type'.
final DataType canonicalDataType = getCanonicalDataType(dataType, rawValue, result, fieldPrefix, field);
if (canonicalDataType == null) {
return;
}
// Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field.
if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
verifyChildRecord(canonicalDataType, rawValue, dataType, result, field, fieldPrefix);
}
if (canonicalDataType.getFieldType() == RecordFieldType.ARRAY) {
final ArrayDataType arrayDataType = (ArrayDataType) canonicalDataType;
final DataType elementType = arrayDataType.getElementType();
final Object[] arrayObject = (Object[]) rawValue;
int i=0;
for (final Object arrayValue : arrayObject) {
verifyComplexType(elementType, arrayValue, result, fieldPrefix + "[" + i + "]", field);
i++;
}
}
}
private DataType getCanonicalDataType(final DataType dataType, final Object rawValue, final StandardSchemaValidationResult result, final String fieldPrefix, final RecordField field) {
final RecordFieldType fieldType = dataType.getFieldType();
final DataType canonicalDataType;
if (fieldType == RecordFieldType.CHOICE) {
canonicalDataType = DataTypeUtils.chooseDataType(rawValue, (ChoiceDataType) dataType);
if (canonicalDataType == null) {
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + dataType));
return null;
}
} else {
canonicalDataType = dataType;
}
return canonicalDataType;
}
private void verifyChildRecord(final DataType canonicalDataType, final Object rawValue, final DataType expectedDataType, final StandardSchemaValidationResult result,
final RecordField field, final String fieldPrefix) {
// Now that we have the 'canonical data type', we check if it is a Record. If so, we need to validate each sub-field.
if (canonicalDataType.getFieldType() == RecordFieldType.RECORD) {
if (!(rawValue instanceof Record)) { // sanity check
result.addValidationError(new StandardValidationError(concat(fieldPrefix, field), rawValue, ValidationErrorType.INVALID_FIELD,
"Value is of type " + rawValue.getClass().getName() + " but was expected to be of type " + expectedDataType));
return;
}
final RecordDataType recordDataType = (RecordDataType) canonicalDataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
final String fullChildFieldName = concat(fieldPrefix, field);
final SchemaValidationResult childValidationResult = validate((Record) rawValue, childSchema, fullChildFieldName);
if (childValidationResult.isValid()) {
return;
}
for (final ValidationError validationError : childValidationResult.getValidationErrors()) {
result.addValidationError(validationError);
}
}
}
private boolean isTypeCorrect(final Object value, final DataType dataType) {
switch (dataType.getFieldType()) {
case ARRAY:
if (!(value instanceof Object[])) {
return false;
}
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementType = arrayDataType.getElementType();
final Object[] array = (Object[]) value;
for (final Object arrayVal : array) {
if (!isTypeCorrect(arrayVal, elementType)) {
return false;
}
}
return true;
case MAP:
if (!(value instanceof Map)) {
return false;
}
final MapDataType mapDataType = (MapDataType) dataType;
final DataType valueDataType = mapDataType.getValueType();
final Map<?, ?> map = (Map<?, ?>) value;
for (final Object mapValue : map.values()) {
if (!isTypeCorrect(mapValue, valueDataType)) {
return false;
}
}
return true;
case RECORD:
return value instanceof Record;
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
for (final DataType choice : choiceDataType.getPossibleSubTypes()) {
if (isTypeCorrect(value, choice)) {
return true;
}
}
return false;
case BIGINT:
return value instanceof BigInteger;
case BOOLEAN:
return value instanceof Boolean;
case BYTE:
return value instanceof Byte;
case CHAR:
return value instanceof Character;
case DATE:
return value instanceof java.sql.Date;
case DOUBLE:
return value instanceof Double;
case FLOAT:
// Some readers do not provide float vs. double.
// We should consider if it makes sense to allow either a Float or a Double here or have
// a Reader indicate whether or not it supports higher precision, etc.
// Same goes for Short/Integer
return value instanceof Float;
case INT:
return value instanceof Integer;
case LONG:
return value instanceof Long;
case SHORT:
return value instanceof Short;
case STRING:
return value instanceof String;
case TIME:
return value instanceof java.sql.Time;
case TIMESTAMP:
return value instanceof java.sql.Timestamp;
}
return false;
}
private String concat(final String fieldPrefix, final RecordField field) {
return fieldPrefix + "/" + field.getFieldName();
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.validation;
import java.util.Objects;
import java.util.Optional;
import org.apache.nifi.serialization.record.validation.ValidationError;
import org.apache.nifi.serialization.record.validation.ValidationErrorType;
public class StandardValidationError implements ValidationError {
private final Optional<String> fieldName;
private final Optional<Object> inputValue;
private final String explanation;
private final ValidationErrorType type;
public StandardValidationError(final String fieldName, final Object value, final ValidationErrorType type, final String explanation) {
this.fieldName = Optional.ofNullable(fieldName);
this.inputValue = Optional.ofNullable(value);
this.type = type;
this.explanation = explanation;
}
public StandardValidationError(final String fieldName, final ValidationErrorType type, final String explanation) {
this.fieldName = Optional.ofNullable(fieldName);
this.inputValue = Optional.empty();
this.type = type;
this.explanation = Objects.requireNonNull(explanation);
}
public StandardValidationError(final ValidationErrorType type, final String explanation) {
this.fieldName = Optional.empty();
this.inputValue = Optional.empty();
this.type = type;
this.explanation = Objects.requireNonNull(explanation);
}
@Override
public ValidationErrorType getType() {
return type;
}
@Override
public Optional<String> getFieldName() {
return fieldName;
}
@Override
public Optional<Object> getInputValue() {
return inputValue;
}
@Override
public String getExplanation() {
return explanation;
}
@Override
public String toString() {
if (fieldName.isPresent()) {
if (inputValue.isPresent()) {
final Object input = inputValue.get();
if (input instanceof Object[]) {
final StringBuilder sb = new StringBuilder("[");
final Object[] array = (Object[]) input;
for (int i=0; i < array.length; i++) {
final Object arrayValue = array[i];
if (arrayValue instanceof String) {
sb.append('"').append(array[i]).append('"');
} else {
sb.append(array[i]);
}
if (i < array.length - 1) {
sb.append(", ");
}
}
sb.append("]");
return sb.toString() + " is not a valid value for " + fieldName.get() + ": " + explanation;
} else {
return inputValue.get() + " is not a valid value for " + fieldName.get() + ": " + explanation;
}
} else {
return fieldName.get() + " is invalid due to: " + explanation;
}
}
return explanation;
}
@Override
public int hashCode() {
return 31 + 17 * fieldName.hashCode() + 17 * inputValue.hashCode() + 17 * explanation.hashCode();
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof ValidationError)) {
return false;
}
final ValidationError other = (ValidationError) obj;
return getFieldName().equals(other.getFieldName()) && getInputValue().equals(other.getInputValue()) && getExplanation().equals(other.getExplanation());
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.validation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
import org.junit.Test;
public class TestStandardSchemaValidator {
@Test
public void testValidateCorrectSimpleTypesStrictValidation() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
for (final RecordFieldType fieldType : RecordFieldType.values()) {
if (fieldType == RecordFieldType.CHOICE) {
final List<DataType> possibleTypes = new ArrayList<>();
possibleTypes.add(RecordFieldType.INT.getDataType());
possibleTypes.add(RecordFieldType.LONG.getDataType());
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes)));
} else if (fieldType == RecordFieldType.MAP) {
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getMapDataType(RecordFieldType.INT.getDataType())));
} else {
fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
}
}
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long time = df.parse("2017/01/01 17:00:00.000").getTime();
final Map<String, Object> intMap = new LinkedHashMap<>();
intMap.put("height", 48);
intMap.put("width", 96);
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("string", "string");
valueMap.put("boolean", true);
valueMap.put("byte", (byte) 1);
valueMap.put("char", 'c');
valueMap.put("short", (short) 8);
valueMap.put("int", 9);
valueMap.put("bigint", BigInteger.valueOf(8L));
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
valueMap.put("date", new Date(time));
valueMap.put("time", new Time(time));
valueMap.put("timestamp", new Timestamp(time));
valueMap.put("record", null);
valueMap.put("array", null);
valueMap.put("choice", 48L);
valueMap.put("map", intMap);
final Record record = new MapRecord(schema, valueMap);
final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
final StandardSchemaValidator validator = new StandardSchemaValidator(validationContext);
final SchemaValidationResult result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
@Test
public void testValidateWrongButCoerceableType() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
Record record = new MapRecord(schema, valueMap);
final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true);
final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false);
// Validate with correct type of int and a strict validation
StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext);
SchemaValidationResult result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
// Validate with correct type of int and a lenient validation
validator = new StandardSchemaValidator(lenientValidationContext);
result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
// Update Map to set value to a String that is coerceable to an int
valueMap.put("id", "1");
record = new MapRecord(schema, valueMap);
// Validate with incorrect type of string and a strict validation
validator = new StandardSchemaValidator(strictValidationContext);
result = validator.validate(record);
assertFalse(result.isValid());
final Collection<ValidationError> validationErrors = result.getValidationErrors();
assertEquals(1, validationErrors.size());
final ValidationError validationError = validationErrors.iterator().next();
assertEquals("/id", validationError.getFieldName().get());
// Validate with incorrect type of string and a lenient validation
validator = new StandardSchemaValidator(lenientValidationContext);
result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
@Test
public void testMissingRequiredField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType(), false));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
final Record record = new MapRecord(schema, valueMap, false, false);
final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true);
StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext);
SchemaValidationResult result = validator.validate(record);
assertFalse(result.isValid());
assertNotNull(result.getValidationErrors());
final ValidationError error = result.getValidationErrors().iterator().next();
assertEquals("/name", error.getFieldName().get());
}
@Test
public void testMissingNullableField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
Record record = new MapRecord(schema, valueMap, false, false);
final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true);
StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext);
SchemaValidationResult result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
@Test
public void testExtraFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
valueMap.put("name", "John Doe");
Record record = new MapRecord(schema, valueMap, false, false);
final SchemaValidationContext allowExtraFieldsContext = new SchemaValidationContext(schema, true, true);
final SchemaValidationContext forbidExtraFieldsContext = new SchemaValidationContext(schema, false, false);
StandardSchemaValidator validator = new StandardSchemaValidator(allowExtraFieldsContext);
SchemaValidationResult result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
validator = new StandardSchemaValidator(forbidExtraFieldsContext);
result = validator.validate(record);
assertFalse(result.isValid());
assertNotNull(result.getValidationErrors());
final Collection<ValidationError> validationErrors = result.getValidationErrors();
assertEquals(1, validationErrors.size());
final ValidationError validationError = validationErrors.iterator().next();
assertEquals("/name", validationError.getFieldName().get());
System.out.println(validationError);
}
@Test
public void testInvalidEmbeddedField() {
final List<RecordField> accountFields = new ArrayList<>();
accountFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("account", RecordFieldType.RECORD.getRecordDataType(accountSchema)));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> accountValues = new HashMap<>();
accountValues.put("name", "account-1");
accountValues.put("balance", "123.45");
final Record accountRecord = new MapRecord(accountSchema, accountValues);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
valueMap.put("account", accountRecord);
Record record = new MapRecord(schema, valueMap, false, false);
final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true);
final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false);
StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext);
SchemaValidationResult result = validator.validate(record);
assertFalse(result.isValid());
assertNotNull(result.getValidationErrors());
assertEquals(1, result.getValidationErrors().size());
final ValidationError validationError = result.getValidationErrors().iterator().next();
assertEquals("/account/balance", validationError.getFieldName().get());
validator = new StandardSchemaValidator(lenientValidationContext);
result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
@Test
public void testInvalidArrayValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("numbers", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> valueMap = new LinkedHashMap<>();
valueMap.put("id", 1);
valueMap.put("numbers", new Object[] {1, "2", "3"});
Record record = new MapRecord(schema, valueMap, false, false);
final SchemaValidationContext strictValidationContext = new SchemaValidationContext(schema, false, true);
final SchemaValidationContext lenientValidationContext = new SchemaValidationContext(schema, false, false);
StandardSchemaValidator validator = new StandardSchemaValidator(strictValidationContext);
SchemaValidationResult result = validator.validate(record);
assertFalse(result.isValid());
assertNotNull(result.getValidationErrors());
assertEquals(1, result.getValidationErrors().size());
final ValidationError validationError = result.getValidationErrors().iterator().next();
assertEquals("/numbers", validationError.getFieldName().get());
validator = new StandardSchemaValidator(lenientValidationContext);
result = validator.validate(record);
assertTrue(result.isValid());
assertNotNull(result.getValidationErrors());
assertTrue(result.getValidationErrors().isEmpty());
}
}

View File

@ -33,6 +33,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@ -74,7 +75,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException, SchemaValidationException {
if (failAfterN >= recordCount) {
throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
}

View File

@ -44,7 +44,7 @@ class GroovyRecordReader implements RecordReader {
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
].iterator()
Record nextRecord() throws IOException, MalformedRecordException {
Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
return recordIterator.hasNext() ? recordIterator.next() : null
}

View File

@ -51,7 +51,7 @@ class GroovyXmlRecordReader implements RecordReader {
}.iterator()
}
Record nextRecord() throws IOException, MalformedRecordException {
Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? recordIterator.next() : null
}

View File

@ -9,7 +9,8 @@
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
@ -356,6 +357,11 @@
<artifactId>calcite-core</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
</dependencies>
<build>
@ -498,7 +504,8 @@
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
<!-- This file is copied from https://github.com/jeremyh/jBCrypt
because the binary is compiled for Java 8 and we must support Java 7 -->
<exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
</excludes>
</configuration>

View File

@ -0,0 +1,457 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.validation.SchemaValidationContext;
import org.apache.nifi.schema.validation.StandardSchemaValidator;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"record", "schema", "validate"})
@CapabilityDescription("Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the \"valid\" relationship while "
+ "records that do not adhere to hte schema are routed to the \"invalid\" relationship. It is therefore possible for a single incoming FlowFile to be split into two individual "
+ "FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the \"invalid\" relationship will emit a ROUTE Provenance Event "
+ "with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled "
+ "for the \"org.apache.nifi.processors.standard.ValidateRecord\" logger.")
public class ValidateRecord extends AbstractProcessor {
static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property",
"The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry");
static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property",
"The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema");
static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Use Reader's Schema",
"The schema to validate the data against is determined by asking the configured Record Reader for its schema");
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
.name("schema-access-strategy")
.displayName("Schema Access Strategy")
.description("Specifies how to obtain the schema that should be used to validate records")
.allowableValues(READER_SCHEMA, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY)
.defaultValue(READER_SCHEMA.getValue())
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to \"Use 'Schema Name' Property\".")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.build();
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("schema-name")
.displayName("Schema Name")
.description("Specifies the name of the schema to lookup in the Schema Registry property")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${schema.name}")
.required(false)
.build();
static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name("schema-text")
.displayName("Schema Text")
.description("The text of an Avro-formatted Schema")
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(true)
.defaultValue("${avro.schema}")
.required(false)
.build();
static final PropertyDescriptor ALLOW_EXTRA_FIELDS = new PropertyDescriptor.Builder()
.name("allow-extra-fields")
.displayName("Allow Extra Fields")
.description("If the incoming data has fields that are not present in the schema, this property determines whether or not the Record is valid. "
+ "If true, the Record is still valid. If false, the Record will be invalid due to the extra fields.")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
static final PropertyDescriptor STRICT_TYPE_CHECKING = new PropertyDescriptor.Builder()
.name("strict-type-checking")
.displayName("Strict Type Checking")
.description("If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. "
+ "If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the "
+ "correct type (if possible, according to the type coercion supported by the Record Writer).")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
static final Relationship REL_VALID = new Relationship.Builder()
.name("valid")
.description("Records that are valid according to the schema will be routed to this relationship")
.build();
static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("Records that are not valid according to the schema will be routed to this relationship")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(SCHEMA_ACCESS_STRATEGY);
properties.add(SCHEMA_REGISTRY);
properties.add(SCHEMA_NAME);
properties.add(SCHEMA_TEXT);
properties.add(ALLOW_EXTRA_FIELDS);
properties.add(STRICT_TYPE_CHECKING);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_VALID);
relationships.add(REL_INVALID);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final String schemaAccessStrategy = validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
if (!validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
return Collections.singleton(new ValidationResult.Builder()
.subject("Schema Registry")
.valid(false)
.explanation("If the Schema Access Strategy is set to \"Use 'Schema Name' Property\", the Schema Registry property must also be set")
.build());
}
final SchemaRegistry registry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
if (!registry.getSuppliedSchemaFields().contains(SchemaField.SCHEMA_NAME)) {
return Collections.singleton(new ValidationResult.Builder()
.subject("Schema Registry")
.valid(false)
.explanation("The configured Schema Registry does not support accessing schemas by name")
.build());
}
}
return Collections.emptyList();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
final boolean strictTypeChecking = context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
RecordSetWriter validWriter = null;
RecordSetWriter invalidWriter = null;
FlowFile validFlowFile = null;
FlowFile invalidFlowFile = null;
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
final RecordSchema validationSchema = getValidationSchema(context, flowFile, reader);
final SchemaValidationContext validationContext = new SchemaValidationContext(validationSchema, allowExtraFields, strictTypeChecking);
final RecordSchemaValidator validator = new StandardSchemaValidator(validationContext);
int recordCount = 0;
int validCount = 0;
int invalidCount = 0;
final Set<String> extraFields = new HashSet<>();
final Set<String> missingFields = new HashSet<>();
final Set<String> invalidFields = new HashSet<>();
final Set<String> otherProblems = new HashSet<>();
try {
Record record;
while ((record = reader.nextRecord(false, false)) != null) {
final SchemaValidationResult result = validator.validate(record);
recordCount++;
RecordSetWriter writer;
if (result.isValid()) {
validCount++;
if (validFlowFile == null) {
validFlowFile = session.create(flowFile);
}
validWriter = writer = createIfNecessary(validWriter, writerFactory, session, validFlowFile, record.getSchema());
} else {
invalidCount++;
logValidationErrors(flowFile, recordCount, result);
if (invalidFlowFile == null) {
invalidFlowFile = session.create(flowFile);
}
invalidWriter = writer = createIfNecessary(invalidWriter, writerFactory, session, invalidFlowFile, record.getSchema());
// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
// that it is too noisy to be useful.
for (final ValidationError validationError : result.getValidationErrors()) {
final Optional<String> fieldName = validationError.getFieldName();
switch (validationError.getType()) {
case EXTRA_FIELD:
if (fieldName.isPresent()) {
extraFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case MISSING_FIELD:
if (fieldName.isPresent()) {
missingFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case INVALID_FIELD:
if (fieldName.isPresent()) {
invalidFields.add(fieldName.get());
} else {
otherProblems.add(validationError.getExplanation());
}
break;
case OTHER:
otherProblems.add(validationError.getExplanation());
break;
}
}
}
if (writer instanceof RawRecordWriter) {
((RawRecordWriter) writer).writeRawRecord(record);
} else {
writer.write(record);
}
}
if (validWriter != null) {
completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null);
}
if (invalidWriter != null) {
// Build up a String that explains why the records were invalid, so that we can add this to the Provenance Event.
final StringBuilder errorBuilder = new StringBuilder();
errorBuilder.append("Records in this FlowFile were invalid for the following reasons: ");
if (!missingFields.isEmpty()) {
errorBuilder.append("The following ").append(missingFields.size()).append(" fields were missing: ").append(missingFields.toString());
}
if (!extraFields.isEmpty()) {
if (errorBuilder.length() > 0) {
errorBuilder.append("; ");
}
errorBuilder.append("The following ").append(extraFields.size())
.append(" fields were present in the Record but not in the schema: ").append(extraFields.toString());
}
if (!invalidFields.isEmpty()) {
if (errorBuilder.length() > 0) {
errorBuilder.append("; ");
}
errorBuilder.append("The following ").append(invalidFields.size())
.append(" fields had values whose type did not match the schema: ").append(invalidFields.toString());
}
if (!otherProblems.isEmpty()) {
if (errorBuilder.length() > 0) {
errorBuilder.append("; ");
}
errorBuilder.append("The following ").append(otherProblems.size())
.append(" additional problems were encountered: ").append(otherProblems.toString());
}
final String validationErrorString = errorBuilder.toString();
completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
}
} finally {
closeQuietly(validWriter);
closeQuietly(invalidWriter);
}
session.adjustCounter("Records Validated", recordCount, false);
session.adjustCounter("Records Found Valid", validCount, false);
session.adjustCounter("Records Found Invalid", invalidCount, false);
} catch (final IOException | MalformedRecordException | SchemaNotFoundException e) {
getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
if (validFlowFile != null) {
session.remove(validFlowFile);
}
if (invalidFlowFile != null) {
session.remove(invalidFlowFile);
}
return;
}
session.remove(flowFile);
}
private void closeQuietly(final RecordSetWriter writer) {
if (writer != null) {
try {
writer.close();
} catch (final Exception e) {
getLogger().error("Failed to close Record Writer", e);
}
}
}
private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException {
final WriteResult writeResult = writer.finishRecordSet();
writer.close();
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, relationship);
session.getProvenanceReporter().route(flowFile, relationship, details);
}
private RecordSetWriter createIfNecessary(final RecordSetWriter writer, final RecordSetWriterFactory factory, final ProcessSession session,
final FlowFile flowFile, final RecordSchema inputSchema) throws SchemaNotFoundException, IOException {
if (writer != null) {
return writer;
}
final OutputStream out = session.write(flowFile);
final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, flowFile, out);
created.beginRecordSet();
return created;
}
private void logValidationErrors(final FlowFile flowFile, final int recordCount, final SchemaValidationResult result) {
if (getLogger().isDebugEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("For ").append(flowFile).append(" Record #").append(recordCount).append(" is invalid due to:\n");
for (final ValidationError error : result.getValidationErrors()) {
sb.append(error).append("\n");
}
getLogger().debug(sb.toString());
}
}
protected RecordSchema getValidationSchema(final ProcessContext context, final FlowFile flowFile, final RecordReader reader)
throws MalformedRecordException, IOException, SchemaNotFoundException {
final String schemaAccessStrategy = context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
if (schemaAccessStrategy.equals(READER_SCHEMA.getValue())) {
return reader.getSchema();
} else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
return schemaRegistry.retrieveSchema(schemaName);
} else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
final Parser parser = new Schema.Parser();
final Schema avroSchema = parser.parse(schemaText);
return AvroTypeUtil.createSchema(avroSchema);
} else {
throw new ProcessException("Invalid Schema Access Strategy: " + schemaAccessStrategy);
}
}
}

View File

@ -102,6 +102,7 @@ org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.ValidateCsv
org.apache.nifi.processors.standard.ValidateRecord
org.apache.nifi.processors.standard.Wait
org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache

View File

@ -32,7 +32,7 @@ public abstract class AvroRecordReader implements RecordReader {
protected abstract GenericRecord nextAvroRecord() throws IOException;
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
GenericRecord record = nextAvroRecord();
if (record == null) {
return null;

View File

@ -34,6 +34,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.DateTimeUtils;
@ -57,6 +58,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
private volatile boolean firstLineIsHeader;
private volatile boolean ignoreHeader;
@Override
@ -67,7 +70,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
properties.add(CSVUtils.CSV_FORMAT);
properties.add(CSVUtils.VALUE_SEPARATOR);
properties.add(CSVUtils.SKIP_HEADER_LINE);
properties.add(CSVUtils.FIRST_LINE_IS_HEADER);
properties.add(CSVUtils.IGNORE_CSV_HEADER);
properties.add(CSVUtils.QUOTE_CHAR);
properties.add(CSVUtils.ESCAPE_CHAR);
properties.add(CSVUtils.COMMENT_MARKER);
@ -82,6 +86,16 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean();
this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean();
// Ensure that if we are deriving schema from header that we always treat the first line as a header,
// regardless of the 'First Line is Header' property
final String accessStrategy = context.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
if (headerDerivedAllowableValue.getValue().equals(accessStrategy)) {
this.csvFormat = this.csvFormat.withFirstRecordAsHeader();
this.firstLineIsHeader = true;
}
}
@Override
@ -92,7 +106,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset();
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat);
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat);
}
@Override

View File

@ -22,8 +22,13 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.text.DateFormat;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.commons.csv.CSVFormat;
@ -36,7 +41,6 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -49,7 +53,9 @@ public class CSVRecordReader implements RecordReader {
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat,
private List<String> rawFieldNames;
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
this.schema = schema;
@ -62,48 +68,78 @@ public class CSVRecordReader implements RecordReader {
LAZY_TIMESTAMP_FORMAT = () -> tsf;
final Reader reader = new InputStreamReader(new BOMInputStream(in));
final CSVFormat withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
CSVFormat withHeader;
if (hasHeader) {
withHeader = csvFormat.withSkipHeaderRecord();
if (ignoreHeader) {
withHeader = withHeader.withHeader(schema.getFieldNames().toArray(new String[0]));
}
} else {
withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
}
csvParser = new CSVParser(reader, withHeader);
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
final RecordSchema schema = getSchema();
final List<String> rawFieldNames = getRawFieldNames();
final int numFieldNames = rawFieldNames.size();
for (final CSVRecord csvRecord : csvParser) {
final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount());
final Map<String, Object> values = new LinkedHashMap<>();
for (int i = 0; i < csvRecord.size(); i++) {
final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);
final String rawValue = csvRecord.get(i);
for (final RecordField recordField : schema.getFields()) {
String rawValue = null;
final String fieldName = recordField.getFieldName();
if (csvRecord.isSet(fieldName)) {
rawValue = csvRecord.get(fieldName);
} else {
for (final String alias : recordField.getAliases()) {
if (csvRecord.isSet(alias)) {
rawValue = csvRecord.get(alias);
break;
}
}
}
final Optional<DataType> dataTypeOption = schema.getDataType(rawFieldName);
if (rawValue == null) {
rowValues.put(fieldName, null);
if (!dataTypeOption.isPresent() && dropUnknownFields) {
continue;
}
final Object converted = convert(rawValue, recordField.getDataType(), fieldName);
if (converted != null) {
rowValues.put(fieldName, converted);
final Object value;
if (coerceTypes && dataTypeOption.isPresent()) {
value = convert(rawValue, dataTypeOption.get(), rawFieldName);
} else if (dataTypeOption.isPresent()) {
// The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
// dictate a field type. As a result, we will use the schema that we have to attempt to convert
// the value into the desired type if it's a simple type.
value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName);
} else {
value = rawValue;
}
values.put(rawFieldName, value);
}
return new MapRecord(schema, rowValues);
return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
}
return null;
}
private List<String> getRawFieldNames() {
if (this.rawFieldNames != null) {
return this.rawFieldNames;
}
// Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
final SortedMap<Integer, String> sortedMap = new TreeMap<>();
for (final Map.Entry<String, Integer> entry : csvParser.getHeaderMap().entrySet()) {
sortedMap.put(entry.getValue(), entry.getKey());
}
this.rawFieldNames = new ArrayList<>(sortedMap.values());
return this.rawFieldNames;
}
@Override
public RecordSchema getSchema() {
return schema;
@ -115,7 +151,6 @@ public class CSVRecordReader implements RecordReader {
}
final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
if (trimmed.isEmpty()) {
return null;
}
@ -123,6 +158,40 @@ public class CSVRecordReader implements RecordReader {
return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
private Object convertSimpleIfPossible(final String value, final DataType dataType, final String fieldName) {
if (dataType == null || value == null) {
return value;
}
final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
if (trimmed.isEmpty()) {
return null;
}
switch (dataType.getFieldType()) {
case STRING:
return value;
case BOOLEAN:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BYTE:
case CHAR:
case SHORT:
case TIME:
case TIMESTAMP:
case DATE:
if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
} else {
return value;
}
}
return value;
}
@Override
public void close() throws IOException {
csvParser.close();

View File

@ -61,7 +61,7 @@ public class CSVUtils {
.defaultValue("\"")
.required(true)
.build();
static final PropertyDescriptor SKIP_HEADER_LINE = new PropertyDescriptor.Builder()
static final PropertyDescriptor FIRST_LINE_IS_HEADER = new PropertyDescriptor.Builder()
.name("Skip Header Line")
.displayName("Treat First Line as Header")
.description("Specifies whether or not the first line of CSV should be considered a Header or should be considered a record. If the Schema Access Strategy "
@ -74,6 +74,18 @@ public class CSVUtils {
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor IGNORE_CSV_HEADER = new PropertyDescriptor.Builder()
.name("ignore-csv-header")
.displayName("Ignore CSV Header Column Names")
.description("If the first line of a CSV is a header, and the configured schema does not match the fields named in the header line, this controls how "
+ "the Reader will interpret the fields. If this property is true, then the field names mapped to each column are driven only by the configured schema and "
+ "any fields not in the schema will be ignored. If this property is false, then the field names found in the CSV Header will be used as the names of the "
+ "fields.")
.expressionLanguageSupported(false)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.build();
static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder()
.name("Comment Marker")
.description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.")
@ -177,7 +189,7 @@ public class CSVUtils {
.withAllowMissingColumnNames()
.withIgnoreEmptyLines();
final PropertyValue skipHeaderPropertyValue = context.getProperty(SKIP_HEADER_LINE);
final PropertyValue skipHeaderPropertyValue = context.getProperty(FIRST_LINE_IS_HEADER);
if (skipHeaderPropertyValue.getValue() != null && skipHeaderPropertyValue.asBoolean()) {
format = format.withFirstRecordAsHeader();
}

View File

@ -20,19 +20,24 @@ package org.apache.nifi.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter {
public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
private final RecordSchema recordSchema;
private final SchemaAccessWriter schemaWriter;
private final String dateFormat;
@ -40,6 +45,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
private final String timestampFormat;
private final CSVPrinter printer;
private final Object[] fieldValues;
private final boolean includeHeaderLine;
private boolean headerWritten = false;
private String[] fieldNames;
public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException {
@ -50,9 +58,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.includeHeaderLine = includeHeaderLine;
final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]);
final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true);
final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
printer = new CSVPrinter(streamWriter, formatWithHeader);
@ -93,6 +101,34 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
printer.flush();
}
private String[] getFieldNames(final Record record) {
if (fieldNames != null) {
return fieldNames;
}
final Set<String> allFields = new LinkedHashSet<>();
allFields.addAll(record.getRawFieldNames());
allFields.addAll(recordSchema.getFieldNames());
fieldNames = allFields.toArray(new String[0]);
return fieldNames;
}
private void includeHeaderIfNecessary(final Record record, final boolean includeOnlySchemaFields) throws IOException {
if (headerWritten || !includeHeaderLine) {
return;
}
final Object[] fieldNames;
if (includeOnlySchemaFields) {
fieldNames = recordSchema.getFieldNames().toArray(new Object[0]);
} else {
fieldNames = getFieldNames(record);
}
printer.printRecord(fieldNames);
headerWritten = true;
}
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
@ -101,6 +137,8 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
includeHeaderIfNecessary(record, true);
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
@ -110,6 +148,36 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
return schemaWriter.getAttributes(recordSchema);
}
@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
includeHeaderIfNecessary(record, false);
final String[] fieldNames = getFieldNames(record);
// Avoid creating a new Object[] for every Record if we can. But if the record has a different number of columns than does our
// schema, we don't have a lot of options here, so we just create a new Object[] in that case.
final Object[] recordFieldValues = (fieldNames.length == this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
int i = 0;
for (final String fieldName : fieldNames) {
final Optional<RecordField> recordField = recordSchema.getField(fieldName);
if (recordField.isPresent()) {
recordFieldValues[i++] = record.getAsString(fieldName, getFormat(recordField.get()));
} else {
recordFieldValues[i++] = record.getAsString(fieldName);
}
}
printer.printRecord(recordFieldValues);
final Map<String, String> attributes = schemaWriter.getAttributes(recordSchema);
return WriteResult.of(incrementRecordCount(), attributes);
}
@Override
public String getMimeType() {
return "text/csv";

View File

@ -67,6 +67,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
private volatile Grok grok;
private volatile boolean appendUnmatchedLine;
private volatile RecordSchema recordSchema;
private volatile RecordSchema recordSchemaFromGrok;
private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
@ -133,9 +134,11 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
this.recordSchemaFromGrok = createRecordSchema(grok);
final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
this.recordSchema = createRecordSchema(grok);
this.recordSchema = recordSchemaFromGrok;
} else {
this.recordSchema = null;
}
@ -236,6 +239,6 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(flowFile, in, null);
return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
}
}

View File

@ -24,6 +24,7 @@ import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.nifi.serialization.MalformedRecordException;
@ -43,6 +44,7 @@ public class GrokRecordReader implements RecordReader {
private final BufferedReader reader;
private final Grok grok;
private final boolean append;
private final RecordSchema schemaFromGrok;
private RecordSchema schema;
private String nextLine;
@ -55,11 +57,12 @@ public class GrokRecordReader implements RecordReader {
+ "(?:Suppressed\\: )|"
+ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final boolean append) {
public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final RecordSchema schemaFromGrok, final boolean append) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.grok = grok;
this.schema = schema;
this.append = append;
this.schemaFromGrok = schemaFromGrok;
}
@Override
@ -68,7 +71,7 @@ public class GrokRecordReader implements RecordReader {
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
Map<String, Object> valueMap = null;
while (valueMap == null || valueMap.isEmpty()) {
final String line = nextLine == null ? reader.readLine() : nextLine;
@ -85,7 +88,7 @@ public class GrokRecordReader implements RecordReader {
// Read the next line to see if it matches the pattern (in which case we will simply leave it for
// the next call to nextRecord()) or we will attach it to the previously read record.
String stackTrace = null;
final StringBuilder toAppend = new StringBuilder();
final StringBuilder trailingText = new StringBuilder();
while ((nextLine = reader.readLine()) != null) {
final Match nextLineMatch = grok.match(nextLine);
nextLineMatch.captures();
@ -97,7 +100,7 @@ public class GrokRecordReader implements RecordReader {
stackTrace = readStackTrace(nextLine);
break;
} else if (append) {
toAppend.append("\n").append(nextLine);
trailingText.append("\n").append(nextLine);
}
} else {
// The next line matched our pattern.
@ -105,49 +108,78 @@ public class GrokRecordReader implements RecordReader {
}
}
try {
final List<DataType> fieldTypes = schema.getDataTypes();
final Map<String, Object> values = new HashMap<>(fieldTypes.size());
final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields);
return record;
}
for (final RecordField field : schema.getFields()) {
Object value = valueMap.get(field.getFieldName());
if (value == null) {
for (final String alias : field.getAliases()) {
value = valueMap.get(alias);
if (value != null) {
break;
}
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) {
final Map<String, Object> converted = new HashMap<>();
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
final Object rawValue = entry.getValue();
final Object normalizedValue;
if (rawValue instanceof List) {
final List<?> list = (List<?>) rawValue;
final String[] array = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
final Object rawObject = list.get(i);
array[i] = rawObject == null ? null : rawObject.toString();
}
normalizedValue = array;
} else {
normalizedValue = rawValue == null ? null : rawValue.toString();
}
final Optional<RecordField> optionalRecordField = schema.getField(fieldName);
final Object coercedValue;
if (coerceTypes && optionalRecordField.isPresent()) {
final RecordField field = optionalRecordField.get();
final DataType fieldType = field.getDataType();
coercedValue = convert(fieldType, normalizedValue, fieldName);
} else {
coercedValue = normalizedValue;
}
converted.put(fieldName, coercedValue);
}
// If there is any trailing text, determine the last column from the grok schema
// and then append the trailing text to it.
if (append && trailingText.length() > 0) {
String lastPopulatedFieldName = null;
final List<RecordField> schemaFields = schemaFromGrok.getFields();
for (int i = schemaFields.size() - 1; i >= 0; i--) {
final RecordField field = schemaFields.get(i);
Object value = converted.get(field.getFieldName());
if (value != null) {
lastPopulatedFieldName = field.getFieldName();
break;
}
for (final String alias : field.getAliases()) {
value = converted.get(alias);
if (value != null) {
lastPopulatedFieldName = alias;
break;
}
}
}
final String fieldName = field.getFieldName();
if (lastPopulatedFieldName != null) {
final Object value = converted.get(lastPopulatedFieldName);
if (value == null) {
values.put(fieldName, null);
continue;
converted.put(lastPopulatedFieldName, trailingText.toString());
} else if (value instanceof String) { // if not a String it is a List and we will just drop the trailing text
converted.put(lastPopulatedFieldName, (String) value + trailingText.toString());
}
final DataType fieldType = field.getDataType();
final Object converted = convert(fieldType, value.toString(), fieldName);
values.put(fieldName, converted);
}
if (append && toAppend.length() > 0) {
final String lastFieldName = schema.getField(schema.getFieldCount() - 1).getFieldName();
final int fieldIndex = STACK_TRACE_COLUMN_NAME.equals(lastFieldName) ? schema.getFieldCount() - 2 : schema.getFieldCount() - 1;
final String lastFieldBeforeStackTrace = schema.getFieldNames().get(fieldIndex);
final Object existingValue = values.get(lastFieldBeforeStackTrace);
final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString();
values.put(lastFieldBeforeStackTrace, updatedValue);
}
values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
return new MapRecord(schema, values);
} catch (final Exception e) {
throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + nextLine, e);
}
converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
return new MapRecord(schema, converted);
}
@ -200,22 +232,23 @@ public class GrokRecordReader implements RecordReader {
}
protected Object convert(final DataType fieldType, final String string, final String fieldName) {
protected Object convert(final DataType fieldType, final Object rawValue, final String fieldName) {
if (fieldType == null) {
return string;
return rawValue;
}
if (string == null) {
if (rawValue == null) {
return null;
}
// If string is empty then return an empty string if field type is STRING. If field type is
// anything else, we can't really convert it so return null
if (string.isEmpty() && fieldType.getFieldType() != RecordFieldType.STRING) {
final boolean fieldEmpty = rawValue instanceof String && ((String) rawValue).isEmpty();
if (fieldEmpty && fieldType.getFieldType() != RecordFieldType.STRING) {
return null;
}
return DataTypeUtils.convertType(string, fieldType, fieldName);
return DataTypeUtils.convertType(rawValue, fieldType, fieldName);
}

View File

@ -19,19 +19,30 @@ package org.apache.nifi.json;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
@ -70,8 +81,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
}
@Override
public Record nextRecord() throws IOException, MalformedRecordException {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
if (firstObjectConsumed && !array) {
return null;
}
@ -79,7 +91,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final JsonNode nextNode = getNextJsonNode();
final RecordSchema schema = getSchema();
try {
return convertJsonNodeToRecord(nextNode, schema);
return convertJsonNodeToRecord(nextNode, schema, coerceTypes, dropUnknownFields);
} catch (final MalformedRecordException mre) {
throw mre;
} catch (final IOException ioe) {
@ -91,7 +103,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
protected Object getRawNodeValue(final JsonNode fieldNode) throws IOException {
if (fieldNode == null || !fieldNode.isValueNode()) {
return getRawNodeValue(fieldNode, null);
}
protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
if (fieldNode == null || fieldNode.isNull()) {
return null;
}
@ -111,6 +127,53 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
return fieldNode.getTextValue();
}
if (fieldNode.isArray()) {
final ArrayNode arrayNode = (ArrayNode) fieldNode;
final int numElements = arrayNode.size();
final Object[] arrayElements = new Object[numElements];
int count = 0;
final DataType elementDataType;
if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
elementDataType = arrayDataType.getElementType();
} else {
elementDataType = null;
}
for (final JsonNode node : arrayNode) {
final Object value = getRawNodeValue(node, elementDataType);
arrayElements[count++] = value;
}
return arrayElements;
}
if (fieldNode.isObject()) {
RecordSchema childSchema;
if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
final RecordDataType recordDataType = (RecordDataType) dataType;
childSchema = recordDataType.getChildSchema();
} else {
childSchema = null;
}
if (childSchema == null) {
childSchema = new SimpleRecordSchema(Collections.emptyList());
}
final Iterator<String> fieldNames = fieldNode.getFieldNames();
final Map<String, Object> childValues = new HashMap<>();
while (fieldNames.hasNext()) {
final String childFieldName = fieldNames.next();
final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
childValues.put(childFieldName, childValue);
}
final MapRecord record = new MapRecord(childSchema, childValues);
return record;
}
return null;
}
@ -159,5 +222,5 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
return Optional.ofNullable(firstJsonNode);
}
protected abstract Record convertJsonNodeToRecord(final JsonNode nextNode, final RecordSchema schema) throws IOException, MalformedRecordException;
protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.json;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -29,6 +30,7 @@ import java.util.function.Supplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@ -89,7 +91,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
}
@Override
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException {
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) throws IOException {
if (jsonNode == null) {
return null;
}
@ -100,7 +102,8 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
final String fieldName = entry.getKey();
final DataType desiredType = schema.getDataType(fieldName).orElse(null);
if (desiredType == null) {
if (desiredType == null && dropUnknownFields) {
continue;
}
@ -117,7 +120,13 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
final Optional<RecordField> field = schema.getField(fieldName);
final Object defaultValue = field.isPresent() ? field.get().getDefaultValue() : null;
value = convert(value, desiredType, fieldName, defaultValue);
if (coerceTypes && desiredType != null) {
value = convert(value, desiredType, fieldName, defaultValue);
} else {
final DataType dataType = field.isPresent() ? field.get().getDataType() : null;
value = convert(value, dataType);
}
values.put(fieldName, value);
}
@ -125,6 +134,70 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
}
@SuppressWarnings("unchecked")
protected Object convert(final Object value, final DataType dataType) {
if (value == null) {
return null;
}
if (value instanceof List) {
final List<?> list = (List<?>) value;
final Object[] array = new Object[list.size()];
final DataType elementDataType;
if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
elementDataType = ((ArrayDataType) dataType).getElementType();
} else {
elementDataType = null;
}
int i = 0;
for (final Object val : list) {
array[i++] = convert(val, elementDataType);
}
return array;
}
if (value instanceof Map) {
final Map<String, ?> map = (Map<String, ?>) value;
boolean record = false;
for (final Object obj : map.values()) {
if (obj instanceof JsonNode) {
record = true;
}
}
if (!record) {
return value;
}
RecordSchema childSchema = null;
if (dataType != null && dataType.getFieldType() == RecordFieldType.RECORD) {
childSchema = ((RecordDataType) dataType).getChildSchema();
}
if (childSchema == null) {
childSchema = new SimpleRecordSchema(Collections.emptyList());
}
final Map<String, Object> values = new HashMap<>();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String key = entry.getKey();
final Object childValue = entry.getValue();
final RecordField recordField = childSchema.getField(key).orElse(null);
final DataType childDataType = recordField == null ? null : recordField.getDataType();
values.put(key, convert(childValue, childDataType));
}
return new MapRecord(childSchema, values);
}
return value;
}
@SuppressWarnings("unchecked")
protected Object convert(final Object value, final DataType dataType, final String fieldName, final Object defaultValue) {
if (value == null) {

View File

@ -23,6 +23,7 @@ import java.text.DateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@ -69,78 +70,74 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
@Override
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException, MalformedRecordException {
return convertJsonNodeToRecord(jsonNode, schema, null);
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields)
throws IOException, MalformedRecordException {
return convertJsonNodeToRecord(jsonNode, schema, coerceTypes, dropUnknownFields, null);
}
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix) throws IOException, MalformedRecordException {
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknown, final String fieldNamePrefix)
throws IOException, MalformedRecordException {
if (jsonNode == null) {
return null;
}
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
for (final RecordField field : schema.getFields()) {
final String fieldName = field.getFieldName();
return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, coerceTypes, dropUnknown);
}
final JsonNode fieldNode = getJsonNode(jsonNode, field);
final DataType desiredType = field.getDataType();
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
final Object value = convertField(fieldNode, fullFieldName, desiredType);
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
final Map<String, Object> values = new LinkedHashMap<>();
final Iterator<String> fieldNames = jsonNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final JsonNode childNode = jsonNode.get(fieldName);
final RecordField recordField = schema.getField(fieldName).orElse(null);
if (recordField == null && dropUnknown) {
continue;
}
final Object value;
if (coerceTypes && recordField != null) {
final DataType desiredType = recordField.getDataType();
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
} else {
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
}
values.put(fieldName, value);
}
final Supplier<String> supplier = () -> jsonNode.toString();
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"));
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
}
private JsonNode getJsonNode(final JsonNode parent, final RecordField field) {
JsonNode fieldNode = parent.get(field.getFieldName());
if (fieldNode != null) {
return fieldNode;
}
for (final String alias : field.getAliases()) {
fieldNode = parent.get(alias);
if (fieldNode != null) {
return fieldNode;
}
}
return fieldNode;
}
protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException {
protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
if (fieldNode == null || fieldNode.isNull()) {
return null;
}
switch (desiredType.getFieldType()) {
case BOOLEAN:
return DataTypeUtils.toBoolean(getRawNodeValue(fieldNode), fieldName);
case BYTE:
return DataTypeUtils.toByte(getRawNodeValue(fieldNode), fieldName);
case CHAR:
return DataTypeUtils.toCharacter(getRawNodeValue(fieldNode), fieldName);
case DOUBLE:
return DataTypeUtils.toDouble(getRawNodeValue(fieldNode), fieldName);
case FLOAT:
return DataTypeUtils.toFloat(getRawNodeValue(fieldNode), fieldName);
case INT:
return DataTypeUtils.toInteger(getRawNodeValue(fieldNode), fieldName);
case LONG:
return DataTypeUtils.toLong(getRawNodeValue(fieldNode), fieldName);
case SHORT:
return DataTypeUtils.toShort(getRawNodeValue(fieldNode), fieldName);
case STRING:
return DataTypeUtils.toString(getRawNodeValue(fieldNode),
() -> DataTypeUtils.getDateFormat(desiredType.getFieldType(), LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT));
case DATE:
return DataTypeUtils.toDate(getRawNodeValue(fieldNode), LAZY_DATE_FORMAT, fieldName);
case TIME:
return DataTypeUtils.toTime(getRawNodeValue(fieldNode), LAZY_TIME_FORMAT, fieldName);
case TIMESTAMP:
return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), LAZY_TIMESTAMP_FORMAT, fieldName);
case TIMESTAMP: {
final Object rawValue = getRawNodeValue(fieldNode);
final Object converted = DataTypeUtils.convertType(rawValue, desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
return converted;
}
case MAP: {
final DataType valueType = ((MapDataType) desiredType).getValueType();
@ -149,7 +146,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
while (fieldNameItr.hasNext()) {
final String childName = fieldNameItr.next();
final JsonNode childNode = fieldNode.get(childName);
final Object childValue = convertField(childNode, fieldName, valueType);
final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
map.put(childName, childValue);
}
@ -162,7 +159,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
int count = 0;
for (final JsonNode node : arrayNode) {
final DataType elementType = ((ArrayDataType) desiredType).getElementType();
final Object converted = convertField(node, fieldName, elementType);
final Object converted = convertField(node, fieldName, elementType, dropUnknown);
arrayElements[count++] = converted;
}
@ -187,7 +184,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
childSchema = new SimpleRecordSchema(fields);
}
return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".");
return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
} else {
return null;
}

View File

@ -29,7 +29,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -44,7 +46,7 @@ import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter {
public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
private final ComponentLog logger;
private final SchemaAccessWriter schemaAccess;
private final RecordSchema recordSchema;
@ -107,6 +109,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
}
}
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
@ -116,12 +119,25 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
schemaAccess.writeHeader(recordSchema, getOutputStream());
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true);
return schemaAccess.getAttributes(recordSchema);
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)
throws JsonGenerationException, IOException {
@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false);
return WriteResult.of(incrementRecordCount(), schemaAccess.getAttributes(recordSchema));
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator,
final GeneratorTask startTask, final GeneratorTask endTask, final boolean schemaAware) throws JsonGenerationException, IOException {
final Optional<SerializedForm> serializedForm = record.getSerializedForm();
if (serializedForm.isPresent()) {
@ -137,21 +153,36 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
try {
startTask.apply(generator);
for (int i = 0; i < writeSchema.getFieldCount(); i++) {
final RecordField field = writeSchema.getField(i);
final String fieldName = field.getFieldName();
final Object value = record.getValue(field);
if (value == null) {
generator.writeNullField(fieldName);
continue;
if (schemaAware) {
for (final RecordField field : writeSchema.getFields()) {
final String fieldName = field.getFieldName();
final Object value = record.getValue(field);
if (value == null) {
generator.writeNullField(fieldName);
continue;
}
generator.writeFieldName(fieldName);
final DataType dataType = writeSchema.getDataType(fieldName).get();
writeValue(generator, value, fieldName, dataType);
}
} else {
for (final String fieldName : record.getRawFieldNames()) {
final Object value = record.getValue(fieldName);
if (value == null) {
generator.writeNullField(fieldName);
continue;
}
generator.writeFieldName(fieldName);
final DataType dataType = writeSchema.getDataType(fieldName).get();
writeValue(generator, value, fieldName, dataType, i < writeSchema.getFieldCount() - 1);
generator.writeFieldName(fieldName);
writeRawValue(generator, value, fieldName);
}
}
endTask.apply(generator);
} catch (final Exception e) {
logger.error("Failed to write {} with schema {} as a JSON Object due to {}", new Object[] {record, record.getSchema(), e.toString(), e});
@ -159,9 +190,51 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
}
}
@SuppressWarnings("unchecked")
private void writeRawValue(final JsonGenerator generator, final Object value, final String fieldName)
throws JsonGenerationException, IOException {
if (value == null) {
generator.writeNull();
return;
}
if (value instanceof Record) {
final Record record = (Record) value;
writeRecord(record, record.getSchema(), generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject(), false);
return;
}
if (value instanceof Map) {
final Map<String, ?> map = (Map<String, ?>) value;
generator.writeStartObject();
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
generator.writeFieldName(mapKey);
writeRawValue(generator, mapValue, fieldName + "." + mapKey);
}
generator.writeEndObject();
return;
}
if (value instanceof Object[]) {
final Object[] values = (Object[]) value;
generator.writeStartArray();
for (final Object element : values) {
writeRawValue(generator, element, fieldName);
}
generator.writeEndArray();
return;
}
generator.writeObject(value);
}
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType, final boolean moreCols)
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType)
throws JsonGenerationException, IOException {
if (value == null) {
generator.writeNull();
@ -242,7 +315,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
final Record record = (Record) coercedValue;
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
writeRecord(record, childSchema, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject());
writeRecord(record, childSchema, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject(), true);
break;
}
case MAP: {
@ -250,12 +323,12 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
final DataType valueDataType = mapDataType.getValueType();
final Map<String, ?> map = (Map<String, ?>) coercedValue;
generator.writeStartObject();
int i = 0;
for (final Map.Entry<String, ?> entry : map.entrySet()) {
final String mapKey = entry.getKey();
final Object mapValue = entry.getValue();
generator.writeFieldName(mapKey);
writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType, ++i < map.size());
writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType);
}
generator.writeEndObject();
break;
@ -278,9 +351,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
throws JsonGenerationException, IOException {
generator.writeStartArray();
for (int i = 0; i < values.length; i++) {
final boolean moreEntries = i < values.length - 1;
final Object element = values[i];
writeValue(generator, element, fieldName, elementType, moreEntries);
writeValue(generator, element, fieldName, elementType);
}
generator.writeEndArray();
}

View File

@ -73,42 +73,44 @@
<h2>Examples</h2>
<h3>Example 1</h3>
<p>
As an example, consider a FlowFile whose contents consists of the following:
</p>
<code>
id, name, balance, join_date, notes<br />
1, John, 48.23, 04/03/2007 "Our very<br />
<code>
id, name, balance, join_date, notes<br />
1, John, 48.23, 04/03/2007 "Our very<br />
first customer!"<br />
2, Jane, 1245.89, 08/22/2009,<br />
3, Frank Franklin, "48481.29", 04/04/2016,<br />
</code>
2, Jane, 1245.89, 08/22/2009,<br />
3, Frank Franklin, "48481.29", 04/04/2016,<br />
</code>
<p>
Additionally, let's consider that this Controller Service is configured with the Schema Registry pointing to an AvroSchemaRegistry and the schema is
configured as the following:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name": "type": "string" },
{ "name": "balance": "type": "double" },
{ "name": "join_date", "type": {
"type": "int",
"logicalType": "date"
},
{ "name": "notes": "type": "string" }
]
}
</pre>
</code>
<code>
<pre>
{
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name": "type": "string" },
{ "name": "balance": "type": "double" },
{ "name": "join_date", "type": {
"type": "int",
"logicalType": "date"
}},
{ "name": "notes": "type": "string" }
]
}
</pre>
</code>
<p>
In the example above, we see that the 'join_date' column is a Date type. In order for the CSV Reader to be able to properly parse a value as a date,
@ -213,5 +215,120 @@ first customer!"<br />
</table>
<h3>Example 2 - Schema with CSV Header Line</h3>
<p>
When CSV data consists of a header line that outlines the column names, the reader provides
a couple of different properties for configuring how to handle these column names. The
"Schema Access Strategy" property as well as the associated properties ("Schema Registry," "Schema Text," and
"Schema Name" properties) can be used to specify how to obtain the schema. If the "Schema Access Strategy" is set
to "Use String Fields From Header" then the header line of the CSV will be used to determine the schema. Otherwise,
a schema will be referenced elsewhere. But what happens if a schema is obtained from a Schema Registry, for instance,
and the CSV Header indicates a different set of column names?
</p>
<p>
For example, let's say that the following schema is obtained from the Schema Registry:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name": "type": "string" },
{ "name": "balance": "type": "double" },
{ "name": "memo": "type": "string" }
]
}
</pre>
</code>
<p>
And the CSV contains the following data:
</p>
<code>
<pre>
id, name, balance, notes
1, John Doe, 123.45, First Customer
</pre>
</code>
<p>
Note here that our schema indicates that the final column is named "memo" whereas the CSV Header indicates that it is named "notes."
</p>
<p>
In this case, the reader will look at the "Ignore CSV Header Column Names" property. If this property is set to "true" then the column names
provided in the CSV will simply be ignored and the last column will be called "memo." However, if the "Ignore CSV Header Column Names" property
is set to "false" then the result will be that the last column will be named "notes" and each record will have a null value for the "memo" column.
</p>
<p>
With "Ignore CSV Header Column Names" property set to "false":<br />
<table>
<head>
<th>Field Name</th>
<th>Field Value</th>
</head>
<body>
<tr>
<td>id</td>
<td>1</td>
</tr>
<tr>
<td>name</td>
<td>John Doe</td>
</tr>
<tr>
<td>balance</td>
<td>123.45</td>
</tr>
<tr>
<td>memo</td>
<td>First Customer</td>
</tr>
</body>
</table>
</p>
<p>
With "Ignore CSV Header Column Names" property set to "true":<br />
<table>
<head>
<th>Field Name</th>
<th>Field Value</th>
</head>
<body>
<tr>
<td>id</td>
<td>1</td>
</tr>
<tr>
<td>name</td>
<td>John Doe</td>
</tr>
<tr>
<td>balance</td>
<td>123.45</td>
</tr>
<tr>
<td>notes</td>
<td>First Customer</td>
</tr>
<tr>
<td>memo</td>
<td><code>null</code></td>
</tr>
</body>
</table>
</p>
</body>
</html>

View File

@ -253,8 +253,8 @@ public class TestAvroReaderWithEmbeddedSchema {
accountValues.put("accountId", 83L);
final List<RecordField> accountRecordFields = new ArrayList<>();
accountRecordFields.add(new RecordField("accountId", RecordFieldType.LONG.getDataType()));
accountRecordFields.add(new RecordField("accountName", RecordFieldType.STRING.getDataType()));
accountRecordFields.add(new RecordField("accountId", RecordFieldType.LONG.getDataType(), false));
accountRecordFields.add(new RecordField("accountName", RecordFieldType.STRING.getDataType(), false));
final RecordSchema accountRecordSchema = new SimpleRecordSchema(accountRecordFields);
final Record mapRecord = new MapRecord(accountRecordSchema, accountValues);
@ -269,8 +269,8 @@ public class TestAvroReaderWithEmbeddedSchema {
dogMap.put("dogTailLength", 14);
final List<RecordField> dogRecordFields = new ArrayList<>();
dogRecordFields.add(new RecordField("dogTailLength", RecordFieldType.INT.getDataType()));
dogRecordFields.add(new RecordField("dogName", RecordFieldType.STRING.getDataType()));
dogRecordFields.add(new RecordField("dogTailLength", RecordFieldType.INT.getDataType(), false));
dogRecordFields.add(new RecordField("dogName", RecordFieldType.STRING.getDataType(), false));
final RecordSchema dogRecordSchema = new SimpleRecordSchema(dogRecordFields);
final Record dogRecord = new MapRecord(dogRecordSchema, dogMap);
@ -281,8 +281,8 @@ public class TestAvroReaderWithEmbeddedSchema {
catMap.put("catTailLength", 1);
final List<RecordField> catRecordFields = new ArrayList<>();
catRecordFields.add(new RecordField("catTailLength", RecordFieldType.INT.getDataType()));
catRecordFields.add(new RecordField("catName", RecordFieldType.STRING.getDataType()));
catRecordFields.add(new RecordField("catTailLength", RecordFieldType.INT.getDataType(), false));
catRecordFields.add(new RecordField("catName", RecordFieldType.STRING.getDataType(), false));
final RecordSchema catRecordSchema = new SimpleRecordSchema(catRecordFields);
final Record catRecord = new MapRecord(catRecordSchema, catMap);

View File

@ -57,6 +57,11 @@ public class TestCSVRecordReader {
return fields;
}
private CSVRecordReader createReader(final InputStream in, final RecordSchema schema) throws IOException {
return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
}
@Test
public void testDate() throws IOException, MalformedRecordException {
final String text = "date\n11/30/1983";
@ -66,7 +71,7 @@ public class TestCSVRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format,
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
"MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
final Record record = reader.nextRecord();
@ -87,9 +92,8 @@ public class TestCSVRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) {
final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@ -106,9 +110,8 @@ public class TestCSVRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) {
final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@ -129,9 +132,8 @@ public class TestCSVRecordReader {
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) {
final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@ -157,9 +159,8 @@ public class TestCSVRecordReader {
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream baos = new ByteArrayInputStream(inputData)) {
final CSVRecordReader reader = new CSVRecordReader(baos, Mockito.mock(ComponentLog.class), schema, format,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final Record record = reader.nextRecord();
assertNotNull(record);
@ -176,4 +177,149 @@ public class TestCSVRecordReader {
assertNull(reader.nextRecord());
}
}
@Test
public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, continent";
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North America";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertNull(record.getValue("continent"));
assertNull(reader.nextRecord());
}
// test nextRawRecord does contain 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final Record record = reader.nextRecord(false, false);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertNull(record.getValue("country"));
assertEquals("North America", record.getValue("continent"));
assertNull(reader.nextRecord(false, false));
}
}
@Test
public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode";
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
// If schema says that there are fields a, b, c
// and the CSV has a header line that says field names are a, b
// and then the data has values 1,2,3
// then a=1, b=2, c=null
assertNull(record.getValue("country"));
assertNull(reader.nextRecord());
}
// Create another Record Reader that indicates that the header line is present but should be ignored. This should cause
// our schema to be the definitive list of what fields exist.
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, true,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
final Record record = reader.nextRecord();
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
// If schema says that there are fields a, b, c
// and the CSV has a header line that says field names are a, b
// and then the data has values 1,2,3
// then a=1, b=2, c=null
// But if we configure the reader to Ignore the header, then this will not occur!
assertEquals("USA", record.getValue("country"));
assertNull(reader.nextRecord());
}
}
@Test
public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException {
final List<RecordField> fields = getDefaultFields();
final RecordSchema schema = new SimpleRecordSchema(fields);
final String headerLine = "id, name, balance, address, city, state, zipCode, country";
final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA, North America";
final String csvData = headerLine + "\n" + inputRecord;
final byte[] inputData = csvData.getBytes();
// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final Record record = reader.nextRecord(false, false);
assertNotNull(record);
assertEquals("1", record.getValue("id"));
assertEquals("John", record.getValue("name"));
assertEquals("40.80", record.getValue("balance"));
assertEquals("123 My Street", record.getValue("address"));
assertEquals("My City", record.getValue("city"));
assertEquals("MS", record.getValue("state"));
assertEquals("11111", record.getValue("zipCode"));
assertEquals("USA", record.getValue("country"));
assertEquals("North America", record.getValue("unknown_field_index_8"));
assertNull(reader.nextRecord(false, false));
}
}
}

View File

@ -30,6 +30,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@ -127,6 +128,172 @@ public class TestWriteCSVResult {
assertEquals(expectedValues, values);
}
@Test
public void testExtraFieldInWriteRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", "1");
values.put("name", "John");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.write(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id\n1\n", output);
}
@Test
public void testExtraFieldInWriteRawRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("name", "John");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id,name\n1,John\n", output);
}
@Test
public void testMissingFieldWriteRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id,name\n1,\n", output);
}
@Test
public void testMissingFieldWriteRawRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id,name\n1,\n", output);
}
@Test
public void testMissingAndExtraFieldWriteRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("dob", "1/1/1970");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id,name\n1,\n", output);
}
@Test
public void testMissingAndExtraFieldWriteRawRecord() throws IOException {
final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\\').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("dob", "1/1/1970");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String output;
try (final WriteCSVResult writer = new WriteCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
writer.flush();
output = baos.toString();
}
assertEquals("id,dob,name\n1,1/1/1970,\n", output);
}
private DateFormat getDateFormat(final String format) {
final DateFormat df = new SimpleDateFormat(format);
df.setTimeZone(TimeZone.getTimeZone("gmt"));

View File

@ -48,7 +48,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true);
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
@ -78,7 +78,7 @@ public class TestGrokRecordReader {
final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n"
+ "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces";
final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, GrokReader.createRecordSchema(grok), true);
final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final Object[] values = deserializer.nextRecord().getValues();
@ -101,7 +101,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true);
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"};
@ -125,7 +125,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true);
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"};
@ -157,7 +157,7 @@ public class TestGrokRecordReader {
grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), true);
final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, GrokReader.createRecordSchema(grok), GrokReader.createRecordSchema(grok), true);
final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
final String[] messages = new String[] {"message without stack trace",
@ -211,7 +211,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("message"));
assertTrue(fieldNames.contains("stackTrace")); // always implicitly there
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true);
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true);
final Record record = deserializer.nextRecord();
assertEquals("May 22 15:58:23", record.getValue("timestamp"));
@ -248,7 +248,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false);
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, false);
final Record record = deserializer.nextRecord();
assertEquals("1", record.getValue("first"));
@ -283,7 +283,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("fourth"));
assertTrue(fieldNames.contains("fifth"));
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, false);
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, false);
for (int i = 0; i < 2; i++) {
final Record record = deserializer.nextRecord();

View File

@ -161,6 +161,74 @@ public class TestJsonTreeRowRecordReader {
}
}
@Test
public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord();
assertEquals(1, schemaValidatedRecord.getValue("id"));
assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
assertNull(schemaValidatedRecord.getValue("balance"));
}
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id"));
assertEquals("John Doe", rawRecord.getValue("name"));
assertEquals(4750.89, rawRecord.getValue("balance"));
assertEquals("123 My Street", rawRecord.getValue("address"));
assertEquals("My City", rawRecord.getValue("city"));
assertEquals("MS", rawRecord.getValue("state"));
assertEquals("11111", rawRecord.getValue("zipCode"));
assertEquals("USA", rawRecord.getValue("country"));
}
}
@Test
public void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord();
assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
assertNull(schemaValidatedRecord.getValue("balance"));
assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
}
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
assertEquals("John Doe", rawRecord.getValue("name"));
assertEquals(4750.89, rawRecord.getValue("balance"));
assertEquals("123 My Street", rawRecord.getValue("address"));
assertEquals("My City", rawRecord.getValue("city"));
assertEquals("MS", rawRecord.getValue("state"));
assertEquals("11111", rawRecord.getValue("zipCode"));
assertEquals("USA", rawRecord.getValue("country"));
assertEquals(8, rawRecord.getRawFieldNames().size());
}
}
@Test
public void testSingleJsonElement() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());

View File

@ -183,4 +183,162 @@ public class TestWriteJsonResult {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testExtraFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", "1");
values.put("name", "John");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\"}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testExtraFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("name", "John");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\",\"name\":\"John\"}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testMissingFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\",\"name\":null}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testMissingFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\"}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testMissingAndExtraFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("dob", "1/1/1970");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\",\"name\":null}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
@Test
public void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", "1");
values.put("dob", "1/1/1970");
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
}
final byte[] data = baos.toByteArray();
final String expected = "[{\"id\":\"1\",\"dob\":\"1/1/1970\"}]";
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
}