mirror of
https://github.com/apache/nifi.git
synced 2025-02-15 22:45:27 +00:00
NIFI-13542 Added missing Max String Length property for JSON Readers (#9084)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
21959560c5
commit
b9b5c03b2e
@ -76,7 +76,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
|
||||
static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
|
||||
.maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue())
|
||||
.build();
|
||||
|
||||
|
@ -19,11 +19,13 @@ package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
|
||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
@ -48,17 +50,17 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
|
||||
|
||||
private final ComponentLog logger;
|
||||
private final LinkedHashMap<String, JsonPath> jsonPaths;
|
||||
private final InputStream in;
|
||||
private final RecordSchema schema;
|
||||
private final Configuration providerConfiguration;
|
||||
|
||||
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||
throws MalformedRecordException, IOException {
|
||||
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null);
|
||||
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, DEFAULT_STREAM_READ_CONSTRAINTS);
|
||||
}
|
||||
|
||||
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
|
||||
@ -72,6 +74,11 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
this.jsonPaths = jsonPaths;
|
||||
this.in = in;
|
||||
this.logger = logger;
|
||||
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
|
||||
final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper);
|
||||
providerConfiguration = Configuration.builder().jsonProvider(jsonProvider).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,7 +97,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
return null;
|
||||
}
|
||||
|
||||
final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
|
||||
final DocumentContext ctx = JsonPath.using(providerConfiguration).parse(jsonNode.toString());
|
||||
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
|
||||
|
||||
for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
|
||||
|
@ -39,15 +39,20 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
|
||||
private final StartingFieldStrategy strategy;
|
||||
|
||||
public JsonRecordSource(final InputStream in) throws IOException {
|
||||
this(in, null, null);
|
||||
this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS);
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
|
||||
this(in , strategy, startingFieldName, new JsonParserFactory());
|
||||
public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException {
|
||||
this(in, null, null, streamReadConstraints);
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
|
||||
jsonParser = tokenParserFactory.getJsonParser(in, DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
|
||||
this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints);
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory,
|
||||
StreamReadConstraints streamReadConstraints) throws IOException {
|
||||
jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED);
|
||||
this.strategy = strategy;
|
||||
|
||||
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
|
||||
|
@ -21,12 +21,12 @@ import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
|
||||
import org.apache.nifi.json.TokenParserFactory;
|
||||
import org.yaml.snakeyaml.LoaderOptions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class YamlParserFactory implements TokenParserFactory {
|
||||
private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new YAMLMapper());
|
||||
|
||||
/**
|
||||
* Get Parser implementation for YAML
|
||||
@ -39,6 +39,12 @@ public class YamlParserFactory implements TokenParserFactory {
|
||||
*/
|
||||
@Override
|
||||
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
|
||||
return YAML_FACTORY.createParser(in);
|
||||
final LoaderOptions loaderOptions = new LoaderOptions();
|
||||
loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength());
|
||||
final YAMLFactory yamlFactory = YAMLFactory.builder()
|
||||
.loaderOptions(loaderOptions)
|
||||
.build();
|
||||
|
||||
return yamlFactory.setCodec(new YAMLMapper()).createParser(in);
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import org.apache.nifi.json.JsonRecordSource;
|
||||
import org.apache.nifi.json.StartingFieldStrategy;
|
||||
|
||||
@ -23,7 +24,7 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class YamlRecordSource extends JsonRecordSource {
|
||||
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
|
||||
super(in, strategy, startingFieldName, new YamlParserFactory());
|
||||
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
|
||||
super(in, strategy, startingFieldName, new YamlParserFactory(), streamReadConstraints);
|
||||
}
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
|
||||
|
||||
@Override
|
||||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
|
||||
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
|
||||
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in, streamReadConstraints);
|
||||
final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
|
||||
|
||||
return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
|
||||
|
@ -76,8 +76,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
||||
protected volatile String startingFieldName;
|
||||
protected volatile StartingFieldStrategy startingFieldStrategy;
|
||||
protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
|
||||
protected volatile StreamReadConstraints streamReadConstraints;
|
||||
private volatile boolean allowComments;
|
||||
private volatile StreamReadConstraints streamReadConstraints;
|
||||
|
||||
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("starting-field-strategy")
|
||||
@ -182,7 +182,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
||||
}
|
||||
|
||||
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
|
||||
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
|
||||
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,13 +17,11 @@
|
||||
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.json.AbstractJsonRowRecordReader;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.json.JsonTreeRowRecordReader;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
@ -51,17 +49,12 @@ public class YamlTreeReader extends JsonTreeReader {
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
// Remove those properties which are not applicable for YAML
|
||||
properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
|
||||
properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
|
||||
|
||||
return properties;
|
||||
return new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
|
||||
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName);
|
||||
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -70,11 +63,6 @@ public class YamlTreeReader extends JsonTreeReader {
|
||||
schemaApplicationStrategy, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StreamReadConstraints buildStreamReadConstraints(final ConfigurationContext context) {
|
||||
return StreamReadConstraints.defaults();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAllowCommentsEnabled(final ConfigurationContext context) {
|
||||
return ALLOW_COMMENTS_DISABLED;
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
|
||||
@ -237,7 +238,7 @@ class TestInferJsonSchemaAccessStrategy {
|
||||
final InputStream bufferedIn = new BufferedInputStream(in)) {
|
||||
|
||||
final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
|
||||
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName),
|
||||
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName, StreamReadConstraints.defaults()),
|
||||
timestampInference, Mockito.mock(ComponentLog.class)
|
||||
);
|
||||
|
||||
|
@ -1433,7 +1433,7 @@ class TestJsonTreeRowRecordReader {
|
||||
|
||||
private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
|
||||
RecordSchema schema = new InferSchemaAccessStrategy<>(
|
||||
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName),
|
||||
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
|
||||
new JsonSchemaInference(new TimeValueInference(null, null, null)),
|
||||
mock(ComponentLog.class)
|
||||
).getSchema(Collections.emptyMap(), jsonStream, null);
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
@ -1231,7 +1232,7 @@ class TestYamlTreeRowRecordReader {
|
||||
|
||||
private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
|
||||
RecordSchema schema = new InferSchemaAccessStrategy<>(
|
||||
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName),
|
||||
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
|
||||
new JsonSchemaInference(new TimeValueInference(null, null, null)),
|
||||
mock(ComponentLog.class)
|
||||
).getSchema(Collections.emptyMap(), jsonStream, null);
|
||||
|
Loading…
x
Reference in New Issue
Block a user