NIFI-4717: Several minor bug fixes and performance improvements around record-oriented processors

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2359
This commit is contained in:
Mark Payne 2017-12-21 12:31:32 -05:00 committed by Matthew Burgess
parent c59a967623
commit c91d99884a
15 changed files with 236 additions and 96 deletions

View File

@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
@ -33,7 +32,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
public class SimpleRecordSchema implements RecordSchema { public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null; private List<RecordField> fields = null;
private Map<String, Integer> fieldIndices = null; private Map<String, RecordField> fieldMap = null;
private final boolean textAvailable; private final boolean textAvailable;
private final String text; private final String text;
private final String schemaFormat; private final String schemaFormat;
@ -88,29 +87,25 @@ public class SimpleRecordSchema implements RecordSchema {
} }
public void setFields(final List<RecordField> fields) { public void setFields(final List<RecordField> fields) {
if (this.fields != null) { if (this.fields != null) {
throw new IllegalArgumentException("Fields have already been set."); throw new IllegalArgumentException("Fields have already been set.");
} }
this.fields = Collections.unmodifiableList(new ArrayList<>(fields)); this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.fieldIndices = new HashMap<>(fields.size()); this.fieldMap = new HashMap<>(fields.size() * 2);
int index = 0;
for (final RecordField field : fields) { for (final RecordField field : fields) {
Integer previousValue = fieldIndices.put(field.getFieldName(), index); RecordField previousValue = fieldMap.put(field.getFieldName(), field);
if (previousValue != null) { if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
} }
for (final String alias : field.getAliases()) { for (final String alias : field.getAliases()) {
previousValue = fieldIndices.put(alias, index); previousValue = fieldMap.put(alias, field);
if (previousValue != null) { if (previousValue != null) {
throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
} }
} }
index++;
} }
} }
@ -138,24 +133,18 @@ public class SimpleRecordSchema implements RecordSchema {
@Override @Override
public Optional<DataType> getDataType(final String fieldName) { public Optional<DataType> getDataType(final String fieldName) {
final OptionalInt idx = getFieldIndex(fieldName); final RecordField field = fieldMap.get(fieldName);
return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty(); if (field == null) {
return Optional.empty();
}
return Optional.of(field.getDataType());
} }
@Override @Override
public Optional<RecordField> getField(final String fieldName) { public Optional<RecordField> getField(final String fieldName) {
final OptionalInt indexOption = getFieldIndex(fieldName); return Optional.ofNullable(fieldMap.get(fieldName));
if (indexOption.isPresent()) {
return Optional.of(fields.get(indexOption.getAsInt()));
} }
return Optional.empty();
}
private OptionalInt getFieldIndex(final String fieldName) {
final Integer index = fieldIndices.get(fieldName);
return index == null ? OptionalInt.empty() : OptionalInt.of(index);
}
@Override @Override
public boolean equals(final Object obj) { public boolean equals(final Object obj) {

View File

@ -70,10 +70,10 @@ public class MapRecord implements Record {
private Map<String, Object> checkTypes(final Map<String, Object> values, final RecordSchema schema) { private Map<String, Object> checkTypes(final Map<String, Object> values, final RecordSchema schema) {
for (final RecordField field : schema.getFields()) { for (final RecordField field : schema.getFields()) {
final Object value = getExplicitValue(field, values); Object value = getExplicitValue(field, values);
if (value == null) { if (value == null) {
if (field.isNullable()) { if (field.isNullable() || field.getDefaultValue() != null) {
continue; continue;
} }
@ -109,7 +109,12 @@ public class MapRecord implements Record {
final Object[] values = new Object[schema.getFieldCount()]; final Object[] values = new Object[schema.getFieldCount()];
int i = 0; int i = 0;
for (final RecordField recordField : schema.getFields()) { for (final RecordField recordField : schema.getFields()) {
values[i++] = getValue(recordField); Object value = getExplicitValue(recordField);
if (value == null) {
value = recordField.getDefaultValue();
}
values[i++] = value;
} }
return values; return values;
} }

View File

@ -68,7 +68,15 @@ public class RecordField {
this.fieldName = Objects.requireNonNull(fieldName); this.fieldName = Objects.requireNonNull(fieldName);
this.dataType = Objects.requireNonNull(dataType); this.dataType = Objects.requireNonNull(dataType);
this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
// If aliases is the empty set, don't bother with the expense of wrapping in an unmodifiableSet.
Objects.requireNonNull(aliases);
if ((Set<?>) aliases == Collections.EMPTY_SET) {
this.aliases = aliases;
} else {
this.aliases = Collections.unmodifiableSet(aliases);
}
this.defaultValue = defaultValue; this.defaultValue = defaultValue;
this.nullable = nullable; this.nullable = nullable;
} }

View File

@ -31,6 +31,8 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -174,7 +176,13 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final Object obj = rs.getObject(columnIndex); final Object obj = rs.getObject(columnIndex);
if (obj == null || !(obj instanceof Record)) { if (obj == null || !(obj instanceof Record)) {
return RecordFieldType.RECORD.getDataType(); final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME,
RecordFieldType.TIMESTAMP)
.map(recordFieldType -> recordFieldType.getDataType())
.collect(Collectors.toList());
return RecordFieldType.CHOICE.getChoiceDataType(dataTypes);
} }
final Record record = (Record) obj; final Record record = (Record) obj;

View File

@ -891,6 +891,9 @@ public class DataTypeUtils {
if (otherSchema == null) { if (otherSchema == null) {
return thisSchema; return thisSchema;
} }
if (thisSchema == otherSchema) {
return thisSchema;
}
final List<RecordField> otherFields = otherSchema.getFields(); final List<RecordField> otherFields = otherSchema.getFields();
if (otherFields.isEmpty()) { if (otherFields.isEmpty()) {

View File

@ -225,7 +225,7 @@ public class MockPropertyValue implements PropertyValue {
@Override @Override
public boolean isExpressionLanguagePresent() { public boolean isExpressionLanguagePresent() {
if (!expectExpressions) { if (!Boolean.TRUE.equals(expectExpressions)) {
return false; return false;
} }

View File

@ -27,9 +27,11 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -150,9 +152,20 @@ public class AvroTypeUtil {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> options = choiceDataType.getPossibleSubTypes(); final List<DataType> options = choiceDataType.getPossibleSubTypes();
// We need to keep track of which types have been added to the union, because if we have
// two elements in the UNION with the same type, it will fail - even if the logical type is
// different. So if we have an int and a logical type date (which also has a 'concrete type' of int)
// then an Exception will be thrown when we try to create the union. To avoid this, we just keep track
// of the Types and avoid adding it in such a case.
final List<Schema> unionTypes = new ArrayList<>(options.size()); final List<Schema> unionTypes = new ArrayList<>(options.size());
final Set<Type> typesAdded = new HashSet<>();
for (final DataType option : options) { for (final DataType option : options) {
unionTypes.add(buildAvroSchema(option, fieldName, false)); final Schema optionSchema = buildAvroSchema(option, fieldName, false);
if (!typesAdded.contains(optionSchema.getType())) {
unionTypes.add(optionSchema);
typesAdded.add(optionSchema.getType());
}
} }
schema = Schema.createUnion(unionTypes); schema = Schema.createUnion(unionTypes);
@ -213,6 +226,17 @@ public class AvroTypeUtil {
} }
private static Schema nullable(final Schema schema) { private static Schema nullable(final Schema schema) {
if (schema.getType() == Type.UNION) {
final List<Schema> unionTypes = new ArrayList<>(schema.getTypes());
final Schema nullSchema = Schema.create(Type.NULL);
if (unionTypes.contains(nullSchema)) {
return schema;
}
unionTypes.add(nullSchema);
return Schema.createUnion(unionTypes);
}
return Schema.createUnion(Schema.create(Type.NULL), schema); return Schema.createUnion(Schema.create(Type.NULL), schema);
} }

View File

@ -453,6 +453,8 @@ public class QueryRecord extends AbstractProcessor {
return new QueryResult() { return new QueryResult() {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
table.close();
final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql); final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
if (statementQueue == null || !statementQueue.offer(cachedStatement)) { if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
try { try {

View File

@ -165,7 +165,9 @@ public class UpdateRecord extends AbstractRecordProcessor {
} }
} else { } else {
final PropertyValue replacementValue = context.getProperty(recordPathText); final PropertyValue replacementValue = context.getProperty(recordPathText);
final Map<String, String> fieldVariables = new HashMap<>(4);
if (replacementValue.isExpressionLanguagePresent()) {
final Map<String, String> fieldVariables = new HashMap<>();
result.getSelectedFields().forEach(fieldVal -> { result.getSelectedFields().forEach(fieldVal -> {
fieldVariables.clear(); fieldVariables.clear();
@ -176,6 +178,10 @@ public class UpdateRecord extends AbstractRecordProcessor {
final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue(); final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue();
fieldVal.updateValue(evaluatedReplacementVal); fieldVal.updateValue(evaluatedReplacementVal);
}); });
} else {
final String evaluatedReplacementVal = replacementValue.getValue();
result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(evaluatedReplacementVal));
}
} }
} }

View File

@ -18,9 +18,12 @@ package org.apache.nifi.queryrecord;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.AbstractEnumerable;
@ -47,6 +50,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
@ -63,6 +67,8 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
private volatile FlowFile flowFile; private volatile FlowFile flowFile;
private volatile int maxRecordsRead; private volatile int maxRecordsRead;
private final Set<FlowFileEnumerator<?>> enumerators = new HashSet<>();
/** /**
* Creates a FlowFile table. * Creates a FlowFile table.
*/ */
@ -85,6 +91,14 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
return "FlowFileTable"; return "FlowFileTable";
} }
public void close() {
synchronized (enumerators) {
for (final FlowFileEnumerator<?> enumerator : enumerators) {
enumerator.close();
}
}
}
/** /**
* Returns an enumerable over a given projection of the fields. * Returns an enumerable over a given projection of the fields.
* *
@ -96,7 +110,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
@Override @Override
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public Enumerator<Object> enumerator() { public Enumerator<Object> enumerator() {
return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) { final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) {
@Override @Override
protected void onFinish() { protected void onFinish() {
final int recordCount = getRecordsRead(); final int recordCount = getRecordsRead();
@ -104,7 +118,21 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
maxRecordsRead = recordCount; maxRecordsRead = recordCount;
} }
} }
@Override
public void close() {
synchronized (enumerators) {
enumerators.remove(this);
}
super.close();
}
}; };
synchronized (enumerators) {
enumerators.add(flowFileEnumerator);
}
return flowFileEnumerator;
} }
}; };
} }
@ -203,9 +231,13 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
case ARRAY: case ARRAY:
return typeFactory.createJavaType(Object[].class); return typeFactory.createJavaType(Object[].class);
case RECORD: case RECORD:
return typeFactory.createJavaType(Object.class); return typeFactory.createJavaType(Record.class);
case MAP: case MAP:
return typeFactory.createJavaType(HashMap.class); return typeFactory.createJavaType(HashMap.class);
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
case CHOICE:
return typeFactory.createJavaType(Object.class);
} }
throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType); throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);

View File

@ -64,6 +64,7 @@ public class TestUpdateRecord {
public void testLiteralReplacementValue() { public void testLiteralReplacementValue() {
runner.setProperty("/name", "Jane Doe"); runner.setProperty("/name", "Jane Doe");
runner.enqueue(""); runner.enqueue("");
runner.setValidateExpressionUsage(false);
readerService.addRecord("John Doe", 35); readerService.addRecord("John Doe", 35);
runner.run(); runner.run();
@ -188,6 +189,7 @@ public class TestUpdateRecord {
public void testUpdateInArray() throws InitializationException, IOException { public void testUpdateInArray() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader(); final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader); runner.addControllerService("reader", jsonReader);
runner.setValidateExpressionUsage(false);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
@ -218,6 +220,7 @@ public class TestUpdateRecord {
public void testUpdateInNullArray() throws InitializationException, IOException { public void testUpdateInNullArray() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader(); final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader); runner.addControllerService("reader", jsonReader);
runner.setValidateExpressionUsage(false);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));

View File

@ -23,7 +23,7 @@ import java.io.InputStreamReader;
import java.io.Reader; import java.io.Reader;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -41,6 +41,8 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -53,7 +55,7 @@ public class CSVRecordReader implements RecordReader {
private final Supplier<DateFormat> LAZY_TIME_FORMAT; private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private List<String> rawFieldNames; private List<RecordField> recordFields;
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
@ -87,31 +89,37 @@ public class CSVRecordReader implements RecordReader {
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
final RecordSchema schema = getSchema(); final RecordSchema schema = getSchema();
final List<String> rawFieldNames = getRawFieldNames(); final List<RecordField> recordFields = getRecordFields();
final int numFieldNames = rawFieldNames.size(); final int numFieldNames = recordFields.size();
for (final CSVRecord csvRecord : csvParser) { for (final CSVRecord csvRecord : csvParser) {
final Map<String, Object> values = new LinkedHashMap<>(); final Map<String, Object> values = new HashMap<>(recordFields.size() * 2);
for (int i = 0; i < csvRecord.size(); i++) { for (int i = 0; i < csvRecord.size(); i++) {
final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);
final String rawValue = csvRecord.get(i); final String rawValue = csvRecord.get(i);
final Optional<DataType> dataTypeOption = schema.getDataType(rawFieldName); final String rawFieldName;
final DataType dataType;
if (!dataTypeOption.isPresent() && dropUnknownFields) { if (i >= numFieldNames) {
continue; if (!dropUnknownFields) {
values.put("unknown_field_index_" + i, rawValue);
} }
continue;
} else {
final RecordField recordField = recordFields.get(i);
rawFieldName = recordField.getFieldName();
dataType = recordField.getDataType();
}
final Object value; final Object value;
if (coerceTypes && dataTypeOption.isPresent()) { if (coerceTypes) {
value = convert(rawValue, dataTypeOption.get(), rawFieldName); value = convert(rawValue, dataType, rawFieldName);
} else if (dataTypeOption.isPresent()) { } else {
// The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
// dictate a field type. As a result, we will use the schema that we have to attempt to convert // dictate a field type. As a result, we will use the schema that we have to attempt to convert
// the value into the desired type if it's a simple type. // the value into the desired type if it's a simple type.
value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName); value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
} else {
value = rawValue;
} }
values.put(rawFieldName, value); values.put(rawFieldName, value);
@ -124,9 +132,9 @@ public class CSVRecordReader implements RecordReader {
} }
private List<String> getRawFieldNames() { private List<RecordField> getRecordFields() {
if (this.rawFieldNames != null) { if (this.recordFields != null) {
return this.rawFieldNames; return this.recordFields;
} }
// Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order // Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
@ -135,8 +143,19 @@ public class CSVRecordReader implements RecordReader {
sortedMap.put(entry.getValue(), entry.getKey()); sortedMap.put(entry.getValue(), entry.getKey());
} }
this.rawFieldNames = new ArrayList<>(sortedMap.values()); final List<RecordField> fields = new ArrayList<>();
return this.rawFieldNames; final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
for (final String rawFieldName : rawFieldNames) {
final Optional<RecordField> option = schema.getField(rawFieldName);
if (option.isPresent()) {
fields.add(option.get());
} else {
fields.add(new RecordField(rawFieldName, RecordFieldType.STRING.getDataType()));
}
}
this.recordFields = fields;
return fields;
} }

View File

@ -17,11 +17,19 @@
package org.apache.nifi.csv; package org.apache.nifi.csv;
import com.fasterxml.jackson.databind.MappingIterator; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectReader; import java.io.InputStream;
import com.fasterxml.jackson.dataformat.csv.CsvMapper; import java.io.InputStreamReader;
import com.fasterxml.jackson.dataformat.csv.CsvParser; import java.io.Reader;
import com.fasterxml.jackson.dataformat.csv.CsvSchema; import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVFormat;
import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.CharUtils; import org.apache.commons.lang3.CharUtils;
@ -35,18 +43,11 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException; import com.fasterxml.jackson.databind.MappingIterator;
import java.io.InputStream; import com.fasterxml.jackson.databind.ObjectReader;
import java.io.InputStreamReader; import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import java.io.Reader; import com.fasterxml.jackson.dataformat.csv.CsvParser;
import java.text.DateFormat; import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
public class JacksonCSVRecordReader implements RecordReader { public class JacksonCSVRecordReader implements RecordReader {
@ -140,6 +141,7 @@ public class JacksonCSVRecordReader implements RecordReader {
} }
} }
} }
// Check for empty lines and ignore them // Check for empty lines and ignore them
boolean foundRecord = true; boolean foundRecord = true;
if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) { if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) {
@ -154,12 +156,13 @@ public class JacksonCSVRecordReader implements RecordReader {
} }
} }
} }
// If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return // If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return
if (!foundRecord) { if (!foundRecord) {
return null; return null;
} }
final Map<String, Object> values = new LinkedHashMap<>(); final Map<String, Object> values = new HashMap<>(rawFieldNames.size() * 2);
final int numFieldNames = rawFieldNames.size(); final int numFieldNames = rawFieldNames.size();
for (int i = 0; i < csvRecord.length; i++) { for (int i = 0; i < csvRecord.length; i++) {
final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i); final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);

View File

@ -23,7 +23,6 @@ import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -84,20 +83,52 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, coerceTypes, dropUnknown); return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, coerceTypes, dropUnknown);
} }
private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
if (jsonNode.has(field.getFieldName())) {
return jsonNode.get(field.getFieldName());
}
for (final String alias : field.getAliases()) {
if (jsonNode.has(alias)) {
return jsonNode.get(alias);
}
}
return null;
}
private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
final Map<String, Object> values = new LinkedHashMap<>(); final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);
if (dropUnknown) {
for (final RecordField recordField : schema.getFields()) {
final JsonNode childNode = getChildNode(jsonNode, recordField);
if (childNode == null) {
continue;
}
final String fieldName = recordField.getFieldName();
final Object value;
if (coerceTypes) {
final DataType desiredType = recordField.getDataType();
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
} else {
value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
}
values.put(fieldName, value);
}
} else {
final Iterator<String> fieldNames = jsonNode.getFieldNames(); final Iterator<String> fieldNames = jsonNode.getFieldNames();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next(); final String fieldName = fieldNames.next();
final JsonNode childNode = jsonNode.get(fieldName); final JsonNode childNode = jsonNode.get(fieldName);
final RecordField recordField = schema.getField(fieldName).orElse(null); final RecordField recordField = schema.getField(fieldName).orElse(null);
if (recordField == null && dropUnknown) {
continue;
}
final Object value; final Object value;
if (coerceTypes && recordField != null) { if (coerceTypes && recordField != null) {
@ -110,6 +141,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
values.put(fieldName, value); values.put(fieldName, value);
} }
}
final Supplier<String> supplier = () -> jsonNode.toString(); final Supplier<String> supplier = () -> jsonNode.toString();
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown); return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.math.BigInteger; import java.math.BigInteger;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -117,26 +118,31 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
public Map<String, String> writeRecord(final Record record) throws IOException { public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the // If we are not writing an active record set, then we need to ensure that we write the
// schema information. // schema information.
boolean firstRecord = false;
if (!isActiveRecordSet()) { if (!isActiveRecordSet()) {
generator.flush(); generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream()); schemaAccess.writeHeader(recordSchema, getOutputStream());
firstRecord = true;
} }
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true); writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true);
return schemaAccess.getAttributes(recordSchema); return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
} }
@Override @Override
public WriteResult writeRawRecord(final Record record) throws IOException { public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the // If we are not writing an active record set, then we need to ensure that we write the
// schema information. // schema information.
boolean firstRecord = false;
if (!isActiveRecordSet()) { if (!isActiveRecordSet()) {
generator.flush(); generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream()); schemaAccess.writeHeader(recordSchema, getOutputStream());
firstRecord = true;
} }
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false); writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false);
return WriteResult.of(incrementRecordCount(), schemaAccess.getAttributes(recordSchema)); final Map<String, String> attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
return WriteResult.of(incrementRecordCount(), attributes);
} }
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator,