diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index 59f12770e9..d718e7e4bc 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -17,14 +17,17 @@ package org.apache.nifi.json; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -51,16 +54,37 @@ import java.util.function.BiPredicate; import java.util.function.Supplier; public abstract class AbstractJsonRowRecordReader implements RecordReader { + public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB"; + + static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder() + .name("Max String Length") + .displayName("Max String Length") + .description("The maximum allowed length of a string value when parsing the JSON document") + .required(true) + .defaultValue(DEFAULT_MAX_STRING_LENGTH) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor ALLOW_COMMENTS = new PropertyDescriptor.Builder() + .name("Allow Comments") + .displayName("Allow Comments") + .description("Whether to allow comments when parsing the JSON document") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder() + .maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue()) + .build(); private final ComponentLog logger; - private final Supplier LAZY_DATE_FORMAT; - private final Supplier LAZY_TIME_FORMAT; - private final Supplier LAZY_TIMESTAMP_FORMAT; + private final Supplier lazyDateFormat; + private final Supplier lazyTimeFormat; + private final Supplier lazyTimestampFormat; private boolean firstObjectConsumed = false; - - private static final JsonFactory jsonFactory = new JsonFactory(); - private static final ObjectMapper codec = new ObjectMapper(); private JsonParser jsonParser; private JsonNode firstJsonNode; private StartingFieldStrategy strategy; @@ -75,9 +99,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); - LAZY_DATE_FORMAT = () -> df; - LAZY_TIME_FORMAT = () -> tf; - LAZY_TIMESTAMP_FORMAT = () -> tsf; + lazyDateFormat = () -> df; + lazyTimeFormat = () -> tf; + lazyTimestampFormat = () -> tsf; } protected AbstractJsonRowRecordReader(final InputStream in, @@ -87,7 +111,19 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final String timestampFormat) throws IOException, MalformedRecordException { - this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null); + this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, false, null); + } + + protected AbstractJsonRowRecordReader(final InputStream in, + final ComponentLog logger, + final String dateFormat, + final String timeFormat, + final String timestampFormat, + final boolean allowComments, + final StreamReadConstraints streamReadConstraints) + throws IOException, MalformedRecordException { + + this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints); } /** @@ -100,8 +136,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { * @param timestampFormat format for parsing timestamp fields * @param strategy whether to start processing from a specific field * @param nestedFieldName the name of the field to start the processing from - * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can + * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can * be accessed by calling {@link #getCapturedFields()} + * @param allowComments whether to allow comments within the JSON stream + * @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints} + * * @throws IOException in case of JSON stream processing failure * @throws MalformedRecordException in case of malformed JSON input */ @@ -112,7 +151,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final String timestampFormat, final StartingFieldStrategy strategy, final String nestedFieldName, - final BiPredicate captureFieldPredicate) + final BiPredicate captureFieldPredicate, + final boolean allowComments, + final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException { this(logger, dateFormat, timeFormat, timestampFormat); @@ -122,7 +163,13 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { capturedFields = new LinkedHashMap<>(); try { - jsonParser = jsonFactory.createParser(in); + final ObjectMapper codec = new ObjectMapper(); + if (allowComments) { + codec.enable(JsonParser.Feature.ALLOW_COMMENTS); + } + codec.getFactory().setStreamReadConstraints(streamReadConstraints != null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS); + + jsonParser = codec.getFactory().createParser(in); jsonParser.setCodec(codec); if (strategy == StartingFieldStrategy.NESTED_FIELD) { @@ -152,15 +199,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } protected Supplier getLazyDateFormat() { - return LAZY_DATE_FORMAT; + return lazyDateFormat; } protected Supplier getLazyTimeFormat() { - return LAZY_TIME_FORMAT; + return lazyTimeFormat; } protected Supplier getLazyTimestampFormat() { - return LAZY_TIMESTAMP_FORMAT; + return lazyTimestampFormat; } @@ -219,7 +266,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { case TIME: case TIMESTAMP: try { - return DataTypeUtils.convertType(textValue, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + return DataTypeUtils.convertType(textValue, dataType, lazyDateFormat, lazyTimeFormat, lazyTimestampFormat, fieldName); } catch (final Exception e) { return textValue; } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index 21ff1c3920..328cf0c868 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; @@ -52,12 +53,20 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { private final ComponentLog logger; private final LinkedHashMap jsonPaths; private final InputStream in; - private RecordSchema schema; + private final RecordSchema schema; public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, - final String dateFormat, final String timeFormat, final String timestampFormat) + final String dateFormat, final String timeFormat, final String timestampFormat) + throws MalformedRecordException, IOException { + this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null); + } + + public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, + final String dateFormat, final String timeFormat, final String timestampFormat, + final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws MalformedRecordException, IOException { - super(in, logger, dateFormat, timeFormat, timestampFormat); + + super(in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints); this.schema = schema; this.jsonPaths = jsonPaths; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index fce50e35c9..758cd96a48 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.nifi.logging.ComponentLog; @@ -52,8 +53,19 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { private final RecordSchema schema; public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { - this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null); + final String dateFormat, final String timeFormat, final String timestampFormat) + throws IOException, MalformedRecordException { + + this(in, logger, schema, dateFormat, timeFormat, timestampFormat, false, null); + } + + public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, + final String dateFormat, final String timeFormat, final String timestampFormat, + final boolean allowComments, final StreamReadConstraints streamReadConstraints) + throws IOException, MalformedRecordException { + + this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, + allowComments, streamReadConstraints); } public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, @@ -62,7 +74,20 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate) throws IOException, MalformedRecordException { - super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate); + this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, + captureFieldPredicate, false, null); + } + + public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, + final String dateFormat, final String timeFormat, final String timestampFormat, + final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, + final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate, + final boolean allowComments, final StreamReadConstraints streamReadConstraints) + throws IOException, MalformedRecordException { + + super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, + allowComments, streamReadConstraints); + if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) { this.schema = getSelectedSchema(schema, startingFieldName); } else { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index aa72654b99..cc89fe98d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -72,7 +72,7 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder() - .name("max-string-length") + .name("Max String Length") .displayName("Max String Length") .description("The maximum allowed length of a string value when parsing the JSON document") .required(true) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java index 5a3ae9fca8..7bffe1a077 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java @@ -156,7 +156,7 @@ public class JoltTransformJSON extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder() - .name("max-string-length") + .name("Max String Length") .displayName("Max String Length") .description("The maximum allowed length of a string value when parsing the JSON document") .required(true) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 5bc6e4e968..a9202268be 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -195,14 +195,17 @@ src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv src/test/resources/csv/multi-bank-account_spec_delimiter.csv src/test/resources/csv/prov-events.csv + src/test/resources/grok/error-with-stack-trace.log src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log src/test/resources/grok/nifi-log-sample.log src/test/resources/grok/single-line-log-messages.txt src/test/resources/grok/grok_patterns.txt + src/test/resources/json/bank-account-array-different-schemas.json src/test/resources/json/bank-account-array-optional-balance.json src/test/resources/json/bank-account-array.json + src/test/resources/json/bank-account-comments.jsonc src/test/resources/json/bank-account-mixed.json src/test/resources/json/bank-account-multiarray.json src/test/resources/json/bank-account-multiline.json @@ -235,11 +238,14 @@ src/test/resources/json/choice-of-string-or-array-record.avsc src/test/resources/json/nested-choice-of-empty-array-or-string.json src/test/resources/json/nested-choice-of-record-array-or-string.json + src/test/resources/syslog/syslog5424/log.txt src/test/resources/syslog/syslog5424/log_all.txt src/test/resources/syslog/syslog5424/log_mix.txt src/test/resources/syslog/syslog5424/log_mix_in_error.txt + src/test/resources/text/testschema + src/test/resources/xml/field_with_sub-element.xml src/test/resources/xml/people.xml src/test/resources/xml/people2.xml diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index caca7c2d72..297400c3d4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.jayway.jsonpath.JsonPath; import org.apache.nifi.annotation.behavior.DynamicProperty; @@ -32,6 +33,7 @@ import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.inference.RecordSourceFactory; @@ -69,23 +71,24 @@ import java.util.function.Supplier; description="User-defined properties identify how to extract specific fields from a JSON object in order to create a Record", expressionLanguageScope=ExpressionLanguageScope.NONE) public class JsonPathReader extends SchemaRegistryService implements RecordReaderFactory { - - private volatile String dateFormat; private volatile String timeFormat; private volatile String timestampFormat; private volatile LinkedHashMap jsonPaths; + private volatile boolean allowComments; + private volatile StreamReadConstraints streamReadConstraints; @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH); + properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS); properties.add(DateTimeUtils.DATE_FORMAT); properties.add(DateTimeUtils.TIME_FORMAT); properties.add(DateTimeUtils.TIMESTAMP_FORMAT); return properties; } - @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -103,6 +106,10 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); + this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean(); + final LinkedHashMap compiled = new LinkedHashMap<>(); for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { if (!descriptor.isDynamic()) { @@ -164,7 +171,6 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); - return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat); + return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints); } - } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 1d4caa8dbc..bcf7d66c95 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -27,6 +28,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -68,13 +70,14 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE; + "See the Usage of the Controller Service for more information and examples.") @SeeAlso(JsonPathReader.class) public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory { - private volatile String dateFormat; private volatile String timeFormat; private volatile String timestampFormat; private volatile String startingFieldName; private volatile StartingFieldStrategy startingFieldStrategy; private volatile SchemaApplicationStrategy schemaApplicationStrategy; + private volatile boolean allowComments; + private volatile StreamReadConstraints streamReadConstraints; public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder() .name("starting-field-strategy") @@ -119,6 +122,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade properties.add(STARTING_FIELD_STRATEGY); properties.add(STARTING_FIELD_NAME); properties.add(SCHEMA_APPLICATION_STRATEGY); + properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH); + properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS); properties.add(DateTimeUtils.DATE_FORMAT); properties.add(DateTimeUtils.TIME_FORMAT); properties.add(DateTimeUtils.TIMESTAMP_FORMAT); @@ -133,6 +138,9 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue()); this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue(); this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue()); + final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); + this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean(); } @Override @@ -165,6 +173,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, - schemaApplicationStrategy, null); + schemaApplicationStrategy, null, allowComments, streamReadConstraints); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 1f77aa6318..7c7f715eff 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -17,6 +17,8 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.core.exc.StreamConstraintsException; import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; import org.apache.nifi.avro.AvroTypeUtil; @@ -296,11 +298,35 @@ class TestJsonTreeRowRecordReader { @Test void testReadMultilineJSON() throws IOException, MalformedRecordException { + testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, null); + } + + @Test + void testReadJSONStringTooLong() { + final StreamConstraintsException mre = assertThrows(StreamConstraintsException.class, () -> + testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, StreamReadConstraints.builder().maxStringLength(2).build())); + assertTrue(mre.getMessage().contains("maximum length")); + assertTrue(mre.getMessage().contains("(2)")); + } + + @Test + void testReadJSONComments() throws IOException, MalformedRecordException { + testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", true, StreamReadConstraints.builder().maxStringLength(20_000).build()); + } + + @Test + void testReadJSONDisallowComments() { + final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> + testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build())); + assertTrue(mre.getMessage().contains("not parse")); + } + + private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException { final List fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)); final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json"); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream(inputFile); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc new file mode 100644 index 0000000000..326a925400 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc @@ -0,0 +1,24 @@ +[ + { + // comment in object + "id": 1, + "name": "John Doe", + "balance": 4750.89, + "address": "123 My Street", + "city": "My City", + "state": "MS", + "zipCode": "11111", + "country": "USA" + }, + // comment between objects + { + "id": 2, + "name": "Jane Doe", + "balance": 4820.09, + "address": "321 Your Street", + "city": "Your City", + "state": "NY", + "zipCode": "33333", + "country": "USA" + } +] \ No newline at end of file