mirror of https://github.com/apache/nifi.git
Removing JsonUtils as all functionality was migrated into AbstractJsonPathProcessor given its limited utility outside of those classes. Adjusting validation approach for JsonPath processors to accomodate caching of expressions.
This commit is contained in:
parent
b1f971335a
commit
4d3cff3592
|
@ -51,19 +51,6 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
|||
|
||||
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
|
||||
|
||||
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
String error = null;
|
||||
try {
|
||||
JsonPath compile = JsonPath.compile(input);
|
||||
} catch (InvalidPathException ipe) {
|
||||
error = ipe.toString();
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
|
||||
}
|
||||
};
|
||||
|
||||
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
||||
|
@ -99,4 +86,32 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
|||
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||
}
|
||||
|
||||
protected abstract static class JsonPathValidator implements Validator {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
JsonPath compiledJsonPath = null;
|
||||
String error = null;
|
||||
try {
|
||||
if (isStale(subject, input)) {
|
||||
compiledJsonPath = JsonPath.compile(input);
|
||||
cacheComputedValue(subject, input, compiledJsonPath);
|
||||
}
|
||||
} catch (InvalidPathException ipe) {
|
||||
error = ipe.toString();
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).valid(error == null).explanation(error).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional hook to act on the compute value
|
||||
*/
|
||||
abstract void cacheComputedValue(String subject, String input, JsonPath computedJsonPath);
|
||||
|
||||
/**
|
||||
* A hook for implementing classes to determine if a cached value is stale for a compiled JsonPath represented
|
||||
* by either a validation
|
||||
*/
|
||||
abstract boolean isStale(String subject, String input);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,13 @@ import com.jayway.jsonpath.DocumentContext;
|
|||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -38,12 +40,14 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
|
@ -92,6 +96,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private ConcurrentMap<String, Tuple<String, JsonPath>> cachedJsonPathMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
|
@ -145,12 +150,46 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
|||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(JSON_PATH_VALIDATOR)
|
||||
.addValidator(new JsonPathValidator() {
|
||||
@Override
|
||||
public void cacheComputedValue(String subject, String input, JsonPath computedJsonPath) {
|
||||
cachedJsonPathMap.put(subject, new Tuple<>(input, computedJsonPath));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStale(String subject, String input) {
|
||||
return cachedJsonPathMap.get(subject) == null;
|
||||
}
|
||||
})
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||
if (descriptor.isDynamic()) {
|
||||
if (!StringUtils.equals(oldValue, newValue)) {
|
||||
cachedJsonPathMap.remove(descriptor.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides cleanup of the map for any JsonPath values that may have been created. This will remove common values
|
||||
* shared between multiple instances, but will be regenerated when the next validation cycle occurs as a result of
|
||||
* isStale()
|
||||
*/
|
||||
@OnRemoved
|
||||
public void onRemoved() {
|
||||
for (PropertyDescriptor propertyDescriptor : getPropertyDescriptors()) {
|
||||
if (propertyDescriptor.isDynamic()) {
|
||||
cachedJsonPathMap.remove(propertyDescriptor.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import com.jayway.jsonpath.DocumentContext;
|
|||
import com.jayway.jsonpath.InvalidJsonException;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -38,6 +39,8 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
|
@ -53,7 +56,17 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
.name("JsonPath Expression")
|
||||
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
|
||||
.required(true)
|
||||
.addValidator(JSON_PATH_VALIDATOR)
|
||||
.addValidator(new JsonPathValidator() {
|
||||
@Override
|
||||
public void cacheComputedValue(String subject, String input, JsonPath computedJson) {
|
||||
JSON_PATH_MAP.put(input, computedJson);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStale(String subject, String input) {
|
||||
return JSON_PATH_MAP.get(input) == null;
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
|
||||
|
@ -63,6 +76,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
private List<PropertyDescriptor> properties;
|
||||
private Set<Relationship> relationships;
|
||||
|
||||
private static final ConcurrentMap<String, JsonPath> JSON_PATH_MAP = new ConcurrentHashMap();
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
|
@ -86,6 +101,16 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||
if (descriptor.equals(ARRAY_JSON_PATH_EXPRESSION)) {
|
||||
if (!StringUtils.equals(oldValue, newValue)) {
|
||||
// clear the cached item
|
||||
JSON_PATH_MAP.remove(oldValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
|
||||
final FlowFile original = processSession.get();
|
||||
|
@ -104,8 +129,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
|
||||
final JsonPath jsonPath = JsonPath.compile(jsonPathExpression);
|
||||
String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue();
|
||||
final JsonPath jsonPath = JSON_PATH_MAP.get(jsonPathExpression);
|
||||
getLogger().info("Using value {} for split ", new Object[]{jsonPathExpression});
|
||||
|
||||
final List<FlowFile> segments = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import net.minidev.json.JSONValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
/**
|
||||
* Provides utilities for interacting with JSON elements
|
||||
*
|
||||
* @see <a href="http://json.org">http://json.org</a>
|
||||
*/
|
||||
public class JsonUtils {
|
||||
|
||||
/**
|
||||
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach
|
||||
* is preferred in determining whether or not a document is valid.
|
||||
* Performs a validation of the provided stream according to RFC 4627 as implemented by {@link net.minidev.json.parser.JSONParser#MODE_RFC4627}
|
||||
*
|
||||
* @param inputStream of content to be validated as JSON
|
||||
* @return true, if the content is valid within the bounds of the strictness specified; false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean isValidJson(InputStream inputStream) throws IOException {
|
||||
try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream)) {
|
||||
return JSONValue.isValidJsonStrict(inputStreamReader);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue