mirror of https://github.com/apache/nifi.git
NIFI-12153 Added Allow Comments and Max String Length to JSON Readers
This closes #7823 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
ae61ebb5ed
commit
099ceec7ed
|
@ -17,14 +17,17 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
|
@ -51,16 +54,37 @@ import java.util.function.BiPredicate;
|
|||
import java.util.function.Supplier;
|
||||
|
||||
public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
|
||||
|
||||
static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("Max String Length")
|
||||
.displayName("Max String Length")
|
||||
.description("The maximum allowed length of a string value when parsing the JSON document")
|
||||
.required(true)
|
||||
.defaultValue(DEFAULT_MAX_STRING_LENGTH)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ALLOW_COMMENTS = new PropertyDescriptor.Builder()
|
||||
.name("Allow Comments")
|
||||
.displayName("Allow Comments")
|
||||
.description("Whether to allow comments when parsing the JSON document")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
|
||||
.maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue())
|
||||
.build();
|
||||
|
||||
private final ComponentLog logger;
|
||||
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||
private final Supplier<DateFormat> lazyDateFormat;
|
||||
private final Supplier<DateFormat> lazyTimeFormat;
|
||||
private final Supplier<DateFormat> lazyTimestampFormat;
|
||||
|
||||
private boolean firstObjectConsumed = false;
|
||||
|
||||
private static final JsonFactory jsonFactory = new JsonFactory();
|
||||
private static final ObjectMapper codec = new ObjectMapper();
|
||||
private JsonParser jsonParser;
|
||||
private JsonNode firstJsonNode;
|
||||
private StartingFieldStrategy strategy;
|
||||
|
@ -75,9 +99,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||
|
||||
LAZY_DATE_FORMAT = () -> df;
|
||||
LAZY_TIME_FORMAT = () -> tf;
|
||||
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||
lazyDateFormat = () -> df;
|
||||
lazyTimeFormat = () -> tf;
|
||||
lazyTimestampFormat = () -> tsf;
|
||||
}
|
||||
|
||||
protected AbstractJsonRowRecordReader(final InputStream in,
|
||||
|
@ -87,7 +111,19 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
final String timestampFormat)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null);
|
||||
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, false, null);
|
||||
}
|
||||
|
||||
protected AbstractJsonRowRecordReader(final InputStream in,
|
||||
final ComponentLog logger,
|
||||
final String dateFormat,
|
||||
final String timeFormat,
|
||||
final String timestampFormat,
|
||||
final boolean allowComments,
|
||||
final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -100,8 +136,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
* @param timestampFormat format for parsing timestamp fields
|
||||
* @param strategy whether to start processing from a specific field
|
||||
* @param nestedFieldName the name of the field to start the processing from
|
||||
* @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
|
||||
* @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
|
||||
* be accessed by calling {@link #getCapturedFields()}
|
||||
* @param allowComments whether to allow comments within the JSON stream
|
||||
* @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints}
|
||||
*
|
||||
* @throws IOException in case of JSON stream processing failure
|
||||
* @throws MalformedRecordException in case of malformed JSON input
|
||||
*/
|
||||
|
@ -112,7 +151,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
final String timestampFormat,
|
||||
final StartingFieldStrategy strategy,
|
||||
final String nestedFieldName,
|
||||
final BiPredicate<String, String> captureFieldPredicate)
|
||||
final BiPredicate<String, String> captureFieldPredicate,
|
||||
final boolean allowComments,
|
||||
final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(logger, dateFormat, timeFormat, timestampFormat);
|
||||
|
@ -122,7 +163,13 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
capturedFields = new LinkedHashMap<>();
|
||||
|
||||
try {
|
||||
jsonParser = jsonFactory.createParser(in);
|
||||
final ObjectMapper codec = new ObjectMapper();
|
||||
if (allowComments) {
|
||||
codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
|
||||
}
|
||||
codec.getFactory().setStreamReadConstraints(streamReadConstraints != null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);
|
||||
|
||||
jsonParser = codec.getFactory().createParser(in);
|
||||
jsonParser.setCodec(codec);
|
||||
|
||||
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
|
||||
|
@ -152,15 +199,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
}
|
||||
|
||||
protected Supplier<DateFormat> getLazyDateFormat() {
|
||||
return LAZY_DATE_FORMAT;
|
||||
return lazyDateFormat;
|
||||
}
|
||||
|
||||
protected Supplier<DateFormat> getLazyTimeFormat() {
|
||||
return LAZY_TIME_FORMAT;
|
||||
return lazyTimeFormat;
|
||||
}
|
||||
|
||||
protected Supplier<DateFormat> getLazyTimestampFormat() {
|
||||
return LAZY_TIMESTAMP_FORMAT;
|
||||
return lazyTimestampFormat;
|
||||
}
|
||||
|
||||
|
||||
|
@ -219,7 +266,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
|||
case TIME:
|
||||
case TIMESTAMP:
|
||||
try {
|
||||
return DataTypeUtils.convertType(textValue, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||
return DataTypeUtils.convertType(textValue, dataType, lazyDateFormat, lazyTimeFormat, lazyTimestampFormat, fieldName);
|
||||
} catch (final Exception e) {
|
||||
return textValue;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
|
@ -52,12 +53,20 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
private final ComponentLog logger;
|
||||
private final LinkedHashMap<String, JsonPath> jsonPaths;
|
||||
private final InputStream in;
|
||||
private RecordSchema schema;
|
||||
private final RecordSchema schema;
|
||||
|
||||
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||
throws MalformedRecordException, IOException {
|
||||
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null);
|
||||
}
|
||||
|
||||
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
throws MalformedRecordException, IOException {
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat);
|
||||
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
|
||||
|
||||
this.schema = schema;
|
||||
this.jsonPaths = jsonPaths;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
@ -52,8 +53,19 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
private final RecordSchema schema;
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, false, null);
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null,
|
||||
allowComments, streamReadConstraints);
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
|
@ -62,7 +74,20 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate);
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
|
||||
captureFieldPredicate, false, null);
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
|
||||
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate,
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate,
|
||||
allowComments, streamReadConstraints);
|
||||
|
||||
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
|
||||
this.schema = getSelectedSchema(schema, startingFieldName);
|
||||
} else {
|
||||
|
|
|
@ -72,7 +72,7 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("max-string-length")
|
||||
.name("Max String Length")
|
||||
.displayName("Max String Length")
|
||||
.description("The maximum allowed length of a string value when parsing the JSON document")
|
||||
.required(true)
|
||||
|
|
|
@ -156,7 +156,7 @@ public class JoltTransformJSON extends AbstractProcessor {
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("max-string-length")
|
||||
.name("Max String Length")
|
||||
.displayName("Max String Length")
|
||||
.description("The maximum allowed length of a string value when parsing the JSON document")
|
||||
.required(true)
|
||||
|
|
|
@ -195,14 +195,17 @@
|
|||
<exclude>src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv</exclude>
|
||||
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
|
||||
<exclude>src/test/resources/csv/prov-events.csv</exclude>
|
||||
|
||||
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
|
||||
<exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
|
||||
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
|
||||
<exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
|
||||
<exclude>src/test/resources/grok/grok_patterns.txt</exclude>
|
||||
|
||||
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-array.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-comments.jsonc</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-mixed.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-multiarray.json</exclude>
|
||||
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
|
||||
|
@ -235,11 +238,14 @@
|
|||
<exclude>src/test/resources/json/choice-of-string-or-array-record.avsc</exclude>
|
||||
<exclude>src/test/resources/json/nested-choice-of-empty-array-or-string.json</exclude>
|
||||
<exclude>src/test/resources/json/nested-choice-of-record-array-or-string.json</exclude>
|
||||
|
||||
<exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
|
||||
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
|
||||
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
|
||||
<exclude>src/test/resources/syslog/syslog5424/log_mix_in_error.txt</exclude>
|
||||
|
||||
<exclude>src/test/resources/text/testschema</exclude>
|
||||
|
||||
<exclude>src/test/resources/xml/field_with_sub-element.xml</exclude>
|
||||
<exclude>src/test/resources/xml/people.xml</exclude>
|
||||
<exclude>src/test/resources/xml/people2.xml</exclude>
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.nifi.context.PropertyContext;
|
|||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.schema.access.SchemaAccessStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.schema.inference.RecordSourceFactory;
|
||||
|
@ -69,23 +71,24 @@ import java.util.function.Supplier;
|
|||
description="User-defined properties identify how to extract specific fields from a JSON object in order to create a Record",
|
||||
expressionLanguageScope=ExpressionLanguageScope.NONE)
|
||||
public class JsonPathReader extends SchemaRegistryService implements RecordReaderFactory {
|
||||
|
||||
|
||||
private volatile String dateFormat;
|
||||
private volatile String timeFormat;
|
||||
private volatile String timestampFormat;
|
||||
private volatile LinkedHashMap<String, JsonPath> jsonPaths;
|
||||
private volatile boolean allowComments;
|
||||
private volatile StreamReadConstraints streamReadConstraints;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
|
||||
properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
|
||||
properties.add(DateTimeUtils.DATE_FORMAT);
|
||||
properties.add(DateTimeUtils.TIME_FORMAT);
|
||||
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
|
@ -103,6 +106,10 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
|
|||
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
|
||||
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
|
||||
|
||||
final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
|
||||
this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
|
||||
this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
|
||||
|
||||
final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>();
|
||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
||||
if (!descriptor.isDynamic()) {
|
||||
|
@ -164,7 +171,6 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
|
|||
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
|
||||
throws IOException, MalformedRecordException, SchemaNotFoundException {
|
||||
final RecordSchema schema = getSchema(variables, in, null);
|
||||
return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
|
||||
return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
|
@ -27,6 +28,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
|||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schema.access.SchemaAccessStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
|
@ -68,13 +70,14 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
|
|||
+ "See the Usage of the Controller Service for more information and examples.")
|
||||
@SeeAlso(JsonPathReader.class)
|
||||
public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
|
||||
|
||||
private volatile String dateFormat;
|
||||
private volatile String timeFormat;
|
||||
private volatile String timestampFormat;
|
||||
private volatile String startingFieldName;
|
||||
private volatile StartingFieldStrategy startingFieldStrategy;
|
||||
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
|
||||
private volatile boolean allowComments;
|
||||
private volatile StreamReadConstraints streamReadConstraints;
|
||||
|
||||
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("starting-field-strategy")
|
||||
|
@ -119,6 +122,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
|||
properties.add(STARTING_FIELD_STRATEGY);
|
||||
properties.add(STARTING_FIELD_NAME);
|
||||
properties.add(SCHEMA_APPLICATION_STRATEGY);
|
||||
properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
|
||||
properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
|
||||
properties.add(DateTimeUtils.DATE_FORMAT);
|
||||
properties.add(DateTimeUtils.TIME_FORMAT);
|
||||
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
|
||||
|
@ -133,6 +138,9 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
|||
this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
|
||||
this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
|
||||
this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
|
||||
final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
|
||||
this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
|
||||
this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,6 +173,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
|||
throws IOException, MalformedRecordException, SchemaNotFoundException {
|
||||
final RecordSchema schema = getSchema(variables, in, null);
|
||||
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
|
||||
schemaApplicationStrategy, null);
|
||||
schemaApplicationStrategy, null, allowComments, streamReadConstraints);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.core.exc.StreamConstraintsException;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
|
@ -296,11 +298,35 @@ class TestJsonTreeRowRecordReader {
|
|||
|
||||
@Test
|
||||
void testReadMultilineJSON() throws IOException, MalformedRecordException {
|
||||
testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadJSONStringTooLong() {
|
||||
final StreamConstraintsException mre = assertThrows(StreamConstraintsException.class, () ->
|
||||
testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, StreamReadConstraints.builder().maxStringLength(2).build()));
|
||||
assertTrue(mre.getMessage().contains("maximum length"));
|
||||
assertTrue(mre.getMessage().contains("(2)"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadJSONComments() throws IOException, MalformedRecordException {
|
||||
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", true, StreamReadConstraints.builder().maxStringLength(20_000).build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadJSONDisallowComments() {
|
||||
final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () ->
|
||||
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
|
||||
assertTrue(mre.getMessage().contains("not parse"));
|
||||
}
|
||||
|
||||
private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException {
|
||||
final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json");
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
|
||||
try (final InputStream in = new FileInputStream(inputFile);
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
[
|
||||
{
|
||||
// comment in object
|
||||
"id": 1,
|
||||
"name": "John Doe",
|
||||
"balance": 4750.89,
|
||||
"address": "123 My Street",
|
||||
"city": "My City",
|
||||
"state": "MS",
|
||||
"zipCode": "11111",
|
||||
"country": "USA"
|
||||
},
|
||||
// comment between objects
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Jane Doe",
|
||||
"balance": 4820.09,
|
||||
"address": "321 Your Street",
|
||||
"city": "Your City",
|
||||
"state": "NY",
|
||||
"zipCode": "33333",
|
||||
"country": "USA"
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue