mirror of https://github.com/apache/nifi.git
Adding an abstract class to serve as a base class for JsonPath related processors and preferring this for much of the functionality present in JsonUtils.
This commit is contained in:
parent
81234f3a6d
commit
46bf048b24
|
@ -0,0 +1,110 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import com.jayway.jsonpath.Configuration;
|
||||||
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
import com.jayway.jsonpath.InvalidPathException;
|
||||||
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
|
import org.apache.nifi.processors.standard.util.JsonUtils;
|
||||||
|
import org.apache.nifi.stream.io.BufferedInputStream;
|
||||||
|
import org.apache.nifi.util.BooleanHolder;
|
||||||
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
|
||||||
|
*
|
||||||
|
* @see <a href="http://json.org">http://json.org</a>
|
||||||
|
* @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
|
||||||
|
*/
|
||||||
|
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
|
protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
|
||||||
|
|
||||||
|
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||||
|
String error = null;
|
||||||
|
try {
|
||||||
|
JsonPath compile = JsonPath.compile(input);
|
||||||
|
} catch (InvalidPathException ipe) {
|
||||||
|
error = ipe.toString();
|
||||||
|
}
|
||||||
|
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||||
|
|
||||||
|
final BooleanHolder validJsonHolder = new BooleanHolder(false);
|
||||||
|
processSession.read(flowFile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
validJsonHolder.set(JsonUtils.isValidJson(in));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||||
|
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
||||||
|
|
||||||
|
if (validJsonHolder.get()) {
|
||||||
|
processSession.read(flowFile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
||||||
|
DocumentContext ctx = JsonPath.parse(in);
|
||||||
|
contextHolder.set(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return contextHolder.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and
|
||||||
|
* {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar.
|
||||||
|
*
|
||||||
|
* @param obj item to be inspected if it is a scalar or a JSON element
|
||||||
|
* @return false, if the object is a supported type; true otherwise
|
||||||
|
*/
|
||||||
|
static boolean isJsonScalar(Object obj) {
|
||||||
|
// For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled as a JSON entity
|
||||||
|
return !(obj instanceof Map || obj instanceof List);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getResultRepresentation(Object jsonPathResult) {
|
||||||
|
if (isJsonScalar(jsonPathResult)) {
|
||||||
|
return jsonPathResult.toString();
|
||||||
|
}
|
||||||
|
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,10 +29,12 @@ import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ProcessorLog;
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
import org.apache.nifi.processor.*;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.processors.standard.util.JsonUtils;
|
|
||||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
import org.apache.nifi.util.StringUtils;
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
@ -57,7 +59,7 @@ import java.util.*;
|
||||||
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
|
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
|
||||||
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
|
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
|
||||||
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
|
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
|
||||||
public class EvaluateJsonPath extends AbstractProcessor {
|
public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
||||||
public static final String DESTINATION_CONTENT = "flowfile-content";
|
public static final String DESTINATION_CONTENT = "flowfile-content";
|
||||||
|
@ -142,7 +144,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
return new PropertyDescriptor.Builder()
|
return new PropertyDescriptor.Builder()
|
||||||
.name(propertyDescriptorName)
|
.name(propertyDescriptorName)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
|
.addValidator(JSON_PATH_VALIDATOR)
|
||||||
.required(false)
|
.required(false)
|
||||||
.dynamic(true)
|
.dynamic(true)
|
||||||
.build();
|
.build();
|
||||||
|
@ -175,7 +177,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
|
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
|
||||||
}
|
}
|
||||||
|
|
||||||
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile);
|
final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, flowFile);
|
||||||
|
|
||||||
if (documentContext == null) {
|
if (documentContext == null) {
|
||||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
|
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
|
||||||
|
@ -194,7 +196,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
|
||||||
try {
|
try {
|
||||||
Object result = documentContext.read(jsonPathExp);
|
Object result = documentContext.read(jsonPathExp);
|
||||||
if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
|
if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) {
|
||||||
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
|
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
|
||||||
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
|
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
|
||||||
processSession.transfer(flowFile, REL_FAILURE);
|
processSession.transfer(flowFile, REL_FAILURE);
|
||||||
|
@ -212,7 +214,7 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
|
final String resultRepresentation = getResultRepresentation(resultHolder.get());
|
||||||
switch (destination) {
|
switch (destination) {
|
||||||
case DESTINATION_ATTRIBUTE:
|
case DESTINATION_ATTRIBUTE:
|
||||||
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
||||||
|
@ -232,4 +234,5 @@ public class EvaluateJsonPath extends AbstractProcessor {
|
||||||
processSession.transfer(flowFile, REL_MATCH);
|
processSession.transfer(flowFile, REL_MATCH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,9 +27,11 @@ import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ProcessorLog;
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
import org.apache.nifi.processor.*;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.processors.standard.util.JsonUtils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -44,13 +46,13 @@ import java.util.*;
|
||||||
+ "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' "
|
+ "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' "
|
||||||
+ "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
|
+ "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or "
|
||||||
+ "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
|
+ "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.")
|
||||||
public class SplitJson extends AbstractProcessor {
|
public class SplitJson extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder()
|
||||||
.name("JsonPath Expression")
|
.name("JsonPath Expression")
|
||||||
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
|
.description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(JsonUtils.JSON_PATH_VALIDATOR)
|
.addValidator(JSON_PATH_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
|
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build();
|
||||||
|
@ -93,7 +95,7 @@ public class SplitJson extends AbstractProcessor {
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
|
||||||
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original);
|
final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, original);
|
||||||
|
|
||||||
if (documentContext == null) {
|
if (documentContext == null) {
|
||||||
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original});
|
||||||
|
@ -129,7 +131,7 @@ public class SplitJson extends AbstractProcessor {
|
||||||
split = processSession.write(split, new OutputStreamCallback() {
|
split = processSession.write(split, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
String resultSegmentContent = JsonUtils.getResultRepresentation(resultSegment);
|
String resultSegmentContent = getResultRepresentation(resultSegment);
|
||||||
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -16,79 +16,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard.util;
|
package org.apache.nifi.processors.standard.util;
|
||||||
|
|
||||||
import com.jayway.jsonpath.Configuration;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.InvalidPathException;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
|
||||||
import net.minidev.json.JSONValue;
|
import net.minidev.json.JSONValue;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
|
||||||
import org.apache.nifi.stream.io.BufferedInputStream;
|
|
||||||
import org.apache.nifi.util.BooleanHolder;
|
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides utilities for interacting with JSON elements and JsonPath expressions and results
|
* Provides utilities for interacting with JSON elements
|
||||||
*
|
*
|
||||||
* @see <a href="http://json.org">http://json.org</a>
|
* @see <a href="http://json.org">http://json.org</a>
|
||||||
* @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a>
|
|
||||||
*/
|
*/
|
||||||
public class JsonUtils {
|
public class JsonUtils {
|
||||||
|
|
||||||
static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider();
|
|
||||||
|
|
||||||
public static final Validator JSON_PATH_VALIDATOR = new Validator() {
|
|
||||||
@Override
|
|
||||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
|
||||||
String error = null;
|
|
||||||
try {
|
|
||||||
JsonPath compile = JsonPath.compile(input);
|
|
||||||
} catch (InvalidPathException ipe) {
|
|
||||||
error = ipe.toString();
|
|
||||||
}
|
|
||||||
return new ValidationResult.Builder().valid(error == null).explanation(error).build();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
|
||||||
|
|
||||||
final BooleanHolder validJsonHolder = new BooleanHolder(false);
|
|
||||||
processSession.read(flowFile, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
validJsonHolder.set(JsonUtils.isValidJson(in));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
|
||||||
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
|
||||||
|
|
||||||
if (validJsonHolder.get()) {
|
|
||||||
processSession.read(flowFile, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) {
|
|
||||||
DocumentContext ctx = JsonPath.parse(in);
|
|
||||||
contextHolder.set(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return contextHolder.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach
|
* JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach
|
||||||
* is preferred in determining whether or not a document is valid.
|
* is preferred in determining whether or not a document is valid.
|
||||||
|
@ -104,24 +44,4 @@ public class JsonUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and
|
|
||||||
* {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar.
|
|
||||||
*
|
|
||||||
* @param obj item to be inspected if it is a scalar or a JSON element
|
|
||||||
* @return false, if the object is a supported type; true otherwise
|
|
||||||
*/
|
|
||||||
public static boolean isJsonScalar(Object obj) {
|
|
||||||
// For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled as a JSON entity
|
|
||||||
return !(obj instanceof Map || obj instanceof List);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static String getResultRepresentation(Object jsonPathResult) {
|
|
||||||
if (JsonUtils.isJsonScalar(jsonPathResult)) {
|
|
||||||
return jsonPathResult.toString();
|
|
||||||
}
|
|
||||||
return JSON_PROVIDER.toJson(jsonPathResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue