mirror of https://github.com/apache/nifi.git
NIFI-12127 Allow Jackson's max string length to be configured on SplitJson and EvaluateJsonPath
This closes #7794 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
4b0c8bf6af
commit
1ac833654b
|
@ -16,20 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
|
||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -38,8 +32,18 @@ import org.apache.nifi.flowfile.FlowFile;
|
|||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
|
||||
*
|
||||
|
@ -49,10 +53,6 @@ import org.apache.nifi.util.StringUtils;
|
|||
*/
|
||||
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||
|
||||
private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
|
||||
|
||||
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
|
||||
|
||||
static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();
|
||||
|
||||
static final String EMPTY_STRING_OPTION = "empty string";
|
||||
|
@ -71,14 +71,33 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
|||
.defaultValue(EMPTY_STRING_OPTION)
|
||||
.build();
|
||||
|
||||
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("max-string-length")
|
||||
.displayName("Max String Length")
|
||||
.description("The maximum allowed length of a string value when parsing the JSON document")
|
||||
.required(true)
|
||||
.defaultValue("20 MB")
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
static Configuration createConfiguration(final int maxStringLength) {
|
||||
final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
|
||||
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
|
||||
|
||||
final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper);
|
||||
return Configuration.builder().jsonProvider(jsonProvider).build();
|
||||
}
|
||||
|
||||
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile, Configuration jsonPathConfiguration) {
|
||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||
final AtomicReference<DocumentContext> contextHolder = new AtomicReference<>(null);
|
||||
processSession.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
||||
DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream);
|
||||
DocumentContext ctx = JsonPath.using(jsonPathConfiguration).parse(bufferedInputStream);
|
||||
contextHolder.set(ctx);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// The JsonPath.parse() above first parses the json, then creates a context object from the parsed
|
||||
|
@ -109,11 +128,11 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
|||
return !(obj instanceof Map || obj instanceof List);
|
||||
}
|
||||
|
||||
static String getResultRepresentation(Object jsonPathResult, String defaultValue) {
|
||||
static String getResultRepresentation(JsonProvider jsonProvider, Object jsonPathResult, String defaultValue) {
|
||||
if (isJsonScalar(jsonPathResult)) {
|
||||
return Objects.toString(jsonPathResult, defaultValue);
|
||||
}
|
||||
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||
return jsonProvider.toJson(jsonPathResult);
|
||||
}
|
||||
|
||||
abstract static class JsonPathValidator implements Validator {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
|
@ -147,6 +149,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
private volatile String returnType;
|
||||
private volatile String pathNotFound;
|
||||
private volatile String nullDefaultValue;
|
||||
private volatile Configuration jsonPathConfiguration;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
|
@ -161,6 +164,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
props.add(RETURN_TYPE);
|
||||
props.add(PATH_NOT_FOUND);
|
||||
props.add(NULL_VALUE_DEFAULT_REPRESENTATION);
|
||||
props.add(MAX_STRING_LENGTH);
|
||||
this.properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
|
@ -248,6 +252,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
}
|
||||
pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
|
||||
nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
|
||||
|
||||
final int maxStringLength = processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
|
||||
jsonPathConfiguration = createConfiguration(maxStringLength);
|
||||
}
|
||||
|
||||
@OnUnscheduled
|
||||
|
@ -266,7 +273,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
|
||||
DocumentContext documentContext;
|
||||
try {
|
||||
documentContext = validateAndEstablishJsonContext(processSession, flowFile);
|
||||
documentContext = validateAndEstablishJsonContext(processSession, flowFile, jsonPathConfiguration);
|
||||
} catch (InvalidJsonException e) {
|
||||
logger.error("FlowFile {} did not have valid JSON content.", flowFile);
|
||||
processSession.transfer(flowFile, REL_FAILURE);
|
||||
|
@ -316,7 +323,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
final String resultRepresentation = getResultRepresentation(result, nullDefaultValue);
|
||||
final String resultRepresentation = getResultRepresentation(jsonPathConfiguration.jsonProvider(), result, nullDefaultValue);
|
||||
if (destinationIsAttribute) {
|
||||
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
||||
} else {
|
||||
|
|
|
@ -16,25 +16,18 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
|
@ -46,16 +39,24 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.jayway.jsonpath.DocumentContext;
|
||||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
|
||||
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
|
||||
|
@ -112,12 +113,14 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
|
||||
private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>();
|
||||
private volatile String nullDefaultValue;
|
||||
private volatile Configuration jsonPathConfiguration;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(ARRAY_JSON_PATH_EXPRESSION);
|
||||
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
|
||||
properties.add(MAX_STRING_LENGTH);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
|
@ -168,6 +171,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
@OnScheduled
|
||||
public void onScheduled(ProcessContext processContext) {
|
||||
nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
|
||||
|
||||
final int maxStringLength = processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
|
||||
jsonPathConfiguration = createConfiguration(maxStringLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -181,7 +187,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
|
||||
DocumentContext documentContext;
|
||||
try {
|
||||
documentContext = validateAndEstablishJsonContext(processSession, original);
|
||||
documentContext = validateAndEstablishJsonContext(processSession, original, jsonPathConfiguration);
|
||||
} catch (InvalidJsonException e) {
|
||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
||||
processSession.transfer(original, REL_FAILURE);
|
||||
|
@ -216,7 +222,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
Object resultSegment = resultList.get(i);
|
||||
FlowFile split = processSession.create(original);
|
||||
split = processSession.write(split, (out) -> {
|
||||
String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue);
|
||||
String resultSegmentContent = getResultRepresentation(jsonPathConfiguration.jsonProvider(), resultSegment, nullDefaultValue);
|
||||
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue