From 46bf048b2407ddbd61024d2d0e69beccb2f72014 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Sat, 21 Feb 2015 20:26:58 -0500 Subject: [PATCH] Adding an abstract class to serve as a base class for JsonPath related processors and preferring this for much of the functionality present in JsonUtils. --- .../standard/AbstractJsonPathProcessor.java | 110 ++++++++++++++++++ .../processors/standard/EvaluateJsonPath.java | 17 +-- .../nifi/processors/standard/SplitJson.java | 14 ++- .../processors/standard/util/JsonUtils.java | 82 +------------ 4 files changed, 129 insertions(+), 94 deletions(-) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java new file mode 100644 index 0000000000..cb4dcf6ab2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.spi.json.JsonProvider; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.standard.util.JsonUtils; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.BooleanHolder; +import org.apache.nifi.util.ObjectHolder; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +/** + * Provides common functionality used for processors interacting and manipulating JSON data via JsonPath. + * + * @see http://json.org + * @see https://github.com/jayway/JsonPath + */ +public abstract class AbstractJsonPathProcessor extends AbstractProcessor { + + protected static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider(); + + public static final Validator JSON_PATH_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + String error = null; + try { + JsonPath compile = JsonPath.compile(input); + } catch (InvalidPathException ipe) { + error = ipe.toString(); + } + return new ValidationResult.Builder().valid(error == null).explanation(error).build(); + } + }; + + public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { + + final BooleanHolder validJsonHolder = new BooleanHolder(false); + processSession.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + validJsonHolder.set(JsonUtils.isValidJson(in)); + } + }); + + // Parse the document once into an associated context to support multiple path evaluations if specified + final ObjectHolder contextHolder = new ObjectHolder<>(null); + + if (validJsonHolder.get()) { + processSession.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { + DocumentContext ctx = JsonPath.parse(in); + contextHolder.set(ctx); + } + } + }); + } + + return contextHolder.get(); + } + + /** + * Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and + * {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar. + * + * @param obj item to be inspected if it is a scalar or a JSON element + * @return false, if the object is a supported type; true otherwise + */ + static boolean isJsonScalar(Object obj) { + // For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled as a JSON entity + return !(obj instanceof Map || obj instanceof List); + } + + public static String getResultRepresentation(Object jsonPathResult) { + if (isJsonScalar(jsonPathResult)) { + return jsonPathResult.toString(); + } + return JSON_PROVIDER.toJson(jsonPathResult); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 65266fffd3..f7caa68f03 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -29,10 +29,12 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.*; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processors.standard.util.JsonUtils; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StringUtils; @@ -57,7 +59,7 @@ import java.util.*; + "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. " + "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with " + "empty strings as the value, and the FlowFile will always be routed to 'matched.'") -public class EvaluateJsonPath extends AbstractProcessor { +public class EvaluateJsonPath extends AbstractJsonPathProcessor { public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; public static final String DESTINATION_CONTENT = "flowfile-content"; @@ -142,7 +144,7 @@ public class EvaluateJsonPath extends AbstractProcessor { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .expressionLanguageSupported(false) - .addValidator(JsonUtils.JSON_PATH_VALIDATOR) + .addValidator(JSON_PATH_VALIDATOR) .required(false) .dynamic(true) .build(); @@ -175,7 +177,7 @@ public class EvaluateJsonPath extends AbstractProcessor { returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR; } - final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile); + final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, flowFile); if (documentContext == null) { logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile}); @@ -194,7 +196,7 @@ public class EvaluateJsonPath extends AbstractProcessor { final ObjectHolder resultHolder = new ObjectHolder<>(null); try { Object result = documentContext.read(jsonPathExp); - if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) { + if (returnType.equals(RETURN_TYPE_SCALAR) && !isJsonScalar(result)) { logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()}); processSession.transfer(flowFile, REL_FAILURE); @@ -212,7 +214,7 @@ public class EvaluateJsonPath extends AbstractProcessor { } } - final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get()); + final String resultRepresentation = getResultRepresentation(resultHolder.get()); switch (destination) { case DESTINATION_ATTRIBUTE: jsonPathResults.put(jsonPathAttrKey, resultRepresentation); @@ -232,4 +234,5 @@ public class EvaluateJsonPath extends AbstractProcessor { processSession.transfer(flowFile, REL_MATCH); } + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 78e1b2a950..8df2de0a2c 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -27,9 +27,11 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.*; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processors.standard.util.JsonUtils; import java.io.IOException; import java.io.OutputStream; @@ -44,13 +46,13 @@ import java.util.*; + "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split,' " + "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or " + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.") -public class SplitJson extends AbstractProcessor { +public class SplitJson extends AbstractJsonPathProcessor { public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() .name("JsonPath Expression") .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.") .required(true) - .addValidator(JsonUtils.JSON_PATH_VALIDATOR) + .addValidator(JSON_PATH_VALIDATOR) .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); @@ -93,7 +95,7 @@ public class SplitJson extends AbstractProcessor { final ProcessorLog logger = getLogger(); - final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original); + final DocumentContext documentContext = validateAndEstablishJsonContext(processSession, original); if (documentContext == null) { logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original}); @@ -129,7 +131,7 @@ public class SplitJson extends AbstractProcessor { split = processSession.write(split, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - String resultSegmentContent = JsonUtils.getResultRepresentation(resultSegment); + String resultSegmentContent = getResultRepresentation(resultSegment); out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); } }); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java index 2174c1ef7e..68b18b8ff5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java @@ -16,79 +16,19 @@ */ package org.apache.nifi.processors.standard.util; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.InvalidPathException; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.spi.json.JsonProvider; import net.minidev.json.JSONValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.util.BooleanHolder; -import org.apache.nifi.util.ObjectHolder; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.List; -import java.util.Map; /** - * Provides utilities for interacting with JSON elements and JsonPath expressions and results + * Provides utilities for interacting with JSON elements * * @see http://json.org - * @see https://github.com/jayway/JsonPath */ public class JsonUtils { - static final JsonProvider JSON_PROVIDER = Configuration.defaultConfiguration().jsonProvider(); - - public static final Validator JSON_PATH_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - String error = null; - try { - JsonPath compile = JsonPath.compile(input); - } catch (InvalidPathException ipe) { - error = ipe.toString(); - } - return new ValidationResult.Builder().valid(error == null).explanation(error).build(); - } - }; - - public static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { - - final BooleanHolder validJsonHolder = new BooleanHolder(false); - processSession.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - validJsonHolder.set(JsonUtils.isValidJson(in)); - } - }); - - // Parse the document once into an associated context to support multiple path evaluations if specified - final ObjectHolder contextHolder = new ObjectHolder<>(null); - - if (validJsonHolder.get()) { - processSession.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { - DocumentContext ctx = JsonPath.parse(in); - contextHolder.set(ctx); - } - } - }); - } - - return contextHolder.get(); - } - /** * JSONValue#isValidJson is permissive to the degree of the Smart JSON definition, accordingly a strict JSON approach * is preferred in determining whether or not a document is valid. @@ -104,24 +44,4 @@ public class JsonUtils { } } - /** - * Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and - * {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar. - * - * @param obj item to be inspected if it is a scalar or a JSON element - * @return false, if the object is a supported type; true otherwise - */ - public static boolean isJsonScalar(Object obj) { - // For the default provider, JsonSmartJsonProvider, a Map or List is able to be handled as a JSON entity - return !(obj instanceof Map || obj instanceof List); - } - - - public static String getResultRepresentation(Object jsonPathResult) { - if (JsonUtils.isJsonScalar(jsonPathResult)) { - return jsonPathResult.toString(); - } - return JSON_PROVIDER.toJson(jsonPathResult); - } - }