mirror of https://github.com/apache/nifi.git
NIFI-3990: This closes #1870. Avoided creating garbage when we can avoid it in the JSON and csv readers and json writer
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
067e9dfeb0
commit
37be0b9820
|
@ -24,6 +24,7 @@ import java.io.Reader;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.csv.CSVFormat;
|
import org.apache.commons.csv.CSVFormat;
|
||||||
import org.apache.commons.csv.CSVParser;
|
import org.apache.commons.csv.CSVParser;
|
||||||
|
@ -43,17 +44,22 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
public class CSVRecordReader implements RecordReader {
|
public class CSVRecordReader implements RecordReader {
|
||||||
private final CSVParser csvParser;
|
private final CSVParser csvParser;
|
||||||
private final RecordSchema schema;
|
private final RecordSchema schema;
|
||||||
private final DateFormat dateFormat;
|
|
||||||
private final DateFormat timeFormat;
|
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||||
private final DateFormat timestampFormat;
|
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||||
|
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||||
|
|
||||||
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat,
|
public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat,
|
||||||
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
||||||
|
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||||
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||||
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||||
|
|
||||||
|
LAZY_DATE_FORMAT = () -> df;
|
||||||
|
LAZY_TIME_FORMAT = () -> tf;
|
||||||
|
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||||
|
|
||||||
final Reader reader = new InputStreamReader(new BOMInputStream(in));
|
final Reader reader = new InputStreamReader(new BOMInputStream(in));
|
||||||
final CSVFormat withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
|
final CSVFormat withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
|
||||||
|
@ -114,7 +120,7 @@ public class CSVRecordReader implements RecordReader {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return DataTypeUtils.convertType(trimmed, dataType, () -> dateFormat, () -> timeFormat, () -> timestampFormat, fieldName);
|
return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.serialization.MalformedRecordException;
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
@ -53,18 +54,23 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||||
private final LinkedHashMap<String, JsonPath> jsonPaths;
|
private final LinkedHashMap<String, JsonPath> jsonPaths;
|
||||||
private final InputStream in;
|
private final InputStream in;
|
||||||
private RecordSchema schema;
|
private RecordSchema schema;
|
||||||
private final DateFormat dateFormat;
|
|
||||||
private final DateFormat timeFormat;
|
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||||
private final DateFormat timestampFormat;
|
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||||
|
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||||
|
|
||||||
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
||||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||||
throws MalformedRecordException, IOException {
|
throws MalformedRecordException, IOException {
|
||||||
super(in, logger);
|
super(in, logger);
|
||||||
|
|
||||||
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||||
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||||
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||||
|
|
||||||
|
LAZY_DATE_FORMAT = () -> df;
|
||||||
|
LAZY_TIME_FORMAT = () -> tf;
|
||||||
|
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||||
|
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
this.jsonPaths = jsonPaths;
|
this.jsonPaths = jsonPaths;
|
||||||
|
@ -162,7 +168,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||||
|
|
||||||
return new MapRecord(childSchema, coercedValues);
|
return new MapRecord(childSchema, coercedValues);
|
||||||
} else {
|
} else {
|
||||||
return DataTypeUtils.convertType(value, dataType, () -> dateFormat, () -> timeFormat, () -> timestampFormat, fieldName);
|
return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,18 +47,24 @@ import org.codehaus.jackson.node.ArrayNode;
|
||||||
|
|
||||||
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||||
private final RecordSchema schema;
|
private final RecordSchema schema;
|
||||||
private final DateFormat dateFormat;
|
|
||||||
private final DateFormat timeFormat;
|
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||||
private final DateFormat timestampFormat;
|
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||||
|
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||||
|
|
||||||
|
|
||||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||||
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
|
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
|
||||||
super(in, logger);
|
super(in, logger);
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
|
|
||||||
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||||
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||||
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||||
|
|
||||||
|
LAZY_DATE_FORMAT = () -> df;
|
||||||
|
LAZY_TIME_FORMAT = () -> tf;
|
||||||
|
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -128,13 +134,13 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||||
return DataTypeUtils.toShort(getRawNodeValue(fieldNode), fieldName);
|
return DataTypeUtils.toShort(getRawNodeValue(fieldNode), fieldName);
|
||||||
case STRING:
|
case STRING:
|
||||||
return DataTypeUtils.toString(getRawNodeValue(fieldNode),
|
return DataTypeUtils.toString(getRawNodeValue(fieldNode),
|
||||||
() -> DataTypeUtils.getDateFormat(desiredType.getFieldType(), () -> dateFormat, () -> timeFormat, () -> timestampFormat));
|
() -> DataTypeUtils.getDateFormat(desiredType.getFieldType(), LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT));
|
||||||
case DATE:
|
case DATE:
|
||||||
return DataTypeUtils.toDate(getRawNodeValue(fieldNode), () -> dateFormat, fieldName);
|
return DataTypeUtils.toDate(getRawNodeValue(fieldNode), LAZY_DATE_FORMAT, fieldName);
|
||||||
case TIME:
|
case TIME:
|
||||||
return DataTypeUtils.toTime(getRawNodeValue(fieldNode), () -> timeFormat, fieldName);
|
return DataTypeUtils.toTime(getRawNodeValue(fieldNode), LAZY_TIME_FORMAT, fieldName);
|
||||||
case TIMESTAMP:
|
case TIMESTAMP:
|
||||||
return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), () -> timestampFormat, fieldName);
|
return DataTypeUtils.toTimestamp(getRawNodeValue(fieldNode), LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||||
case MAP: {
|
case MAP: {
|
||||||
final DataType valueType = ((MapDataType) desiredType).getValueType();
|
final DataType valueType = ((MapDataType) desiredType).getValueType();
|
||||||
|
|
||||||
|
@ -143,7 +149,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||||
while (fieldNameItr.hasNext()) {
|
while (fieldNameItr.hasNext()) {
|
||||||
final String childName = fieldNameItr.next();
|
final String childName = fieldNameItr.next();
|
||||||
final JsonNode childNode = fieldNode.get(childName);
|
final JsonNode childNode = fieldNode.get(childName);
|
||||||
final Object childValue = convertField(childNode, fieldName + "." + childName, valueType);
|
final Object childValue = convertField(childNode, fieldName, valueType);
|
||||||
map.put(childName, childValue);
|
map.put(childName, childValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.math.BigInteger;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||||
|
@ -48,10 +49,10 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
private final SchemaAccessWriter schemaAccess;
|
private final SchemaAccessWriter schemaAccess;
|
||||||
private final RecordSchema recordSchema;
|
private final RecordSchema recordSchema;
|
||||||
private final JsonFactory factory = new JsonFactory();
|
private final JsonFactory factory = new JsonFactory();
|
||||||
private final DateFormat dateFormat;
|
|
||||||
private final DateFormat timeFormat;
|
|
||||||
private final DateFormat timestampFormat;
|
|
||||||
private final JsonGenerator generator;
|
private final JsonGenerator generator;
|
||||||
|
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||||
|
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||||
|
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||||
|
|
||||||
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
|
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
|
||||||
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
||||||
|
@ -61,9 +62,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
this.recordSchema = recordSchema;
|
this.recordSchema = recordSchema;
|
||||||
this.schemaAccess = schemaAccess;
|
this.schemaAccess = schemaAccess;
|
||||||
|
|
||||||
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||||
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||||
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||||
|
|
||||||
|
LAZY_DATE_FORMAT = () -> df;
|
||||||
|
LAZY_TIME_FORMAT = () -> tf;
|
||||||
|
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||||
|
|
||||||
this.generator = factory.createJsonGenerator(out);
|
this.generator = factory.createJsonGenerator(out);
|
||||||
if (prettyPrint) {
|
if (prettyPrint) {
|
||||||
|
@ -164,7 +169,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
}
|
}
|
||||||
|
|
||||||
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
|
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
|
||||||
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, () -> dateFormat, () -> timeFormat, () -> timestampFormat, fieldName);
|
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||||
if (coercedValue == null) {
|
if (coercedValue == null) {
|
||||||
generator.writeNull();
|
generator.writeNull();
|
||||||
return;
|
return;
|
||||||
|
@ -172,7 +177,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
|
|
||||||
switch (chosenDataType.getFieldType()) {
|
switch (chosenDataType.getFieldType()) {
|
||||||
case DATE: {
|
case DATE: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> dateFormat);
|
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_DATE_FORMAT);
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
|
@ -181,7 +186,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TIME: {
|
case TIME: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> timeFormat);
|
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIME_FORMAT);
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
|
@ -190,7 +195,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TIMESTAMP: {
|
case TIMESTAMP: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> timestampFormat);
|
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIMESTAMP_FORMAT);
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue